1 """
2 Streaming out large computed things using twisted and threads.
3 """
4
5
6
7
8
9
10
11 import time
12 import threading
13
14 from twisted.internet import reactor
15 from twisted.internet.interfaces import IPushProducer
16 from twisted.python import threadable
17
18 from zope.interface import implements
19
20 from gavo import base
21 from gavo import utils
22 from gavo.formats import votablewrite
23
24
26 """clients can raise this when they want the stream to abort.
27 """
28
29
31 """is a twisted-enabled Thread to stream out large files produced
32 on the fly.
33
34 It is basically a push producer. To use it, construct it with
35 a data source and a twisted request (or any IFinishableConsumer)
36 If in a nevow resource, you should then return request.deferred.
37
38 The data source simply is a function writeStreamTo taking one
39 argument; this will be the DataStreamer. You can call its write
40 method to deliver data. There's no need to close anything, just
41 let your function return.
42
43 writeStream will be run in a thread to avoid blocking the reactor.
44 """
45
46
47
48 implements(IPushProducer)
49
50 - def __init__(self, writeStreamTo, consumer):
51
52
53
54
55 self.HANDLING_HTTPS = consumer.isSecure()
56
57 threading.Thread.__init__(self)
58 self.writeStreamTo, self.consumer = writeStreamTo, consumer
59 self.paused, self.exceptionToRaise = False, None
60 consumer.registerProducer(self, True)
61 self.connectionLive = True
62 consumer.notifyFinish().addErrback(self._abortProducing)
63 self.setDaemon(True)
64 self.buffer = utils.StreamBuffer()
65
67
68
69
70 self.connectionLive = False
71 if getattr(self.consumer, "channel", None):
72 self.consumer.unregisterProducer()
73 self.exceptionToRaise = StopWriting("Client has hung up")
74
77
80
82 self.exceptionToRaise = StopWriting("Stop writing, please")
83
85 """causes the accumulated data to be written if enough
86 data is there.
87
88 This must be called at least once after buffer.doneWriting()
89 as been called.
90 """
91 while self.connectionLive and not self.exceptionToRaise:
92 data = self.buffer.get()
93 if data is None:
94 return
95 while self.paused and not self.exceptionToRaise:
96
97
98 time.sleep(0.1)
99
100 reactor.callFromThread(self._writeToConsumer, data)
101
103 """schedules data to be written to the consumer.
104 """
105 if self.exceptionToRaise:
106 raise self.exceptionToRaise
107
108
109 if isinstance(data, unicode):
110 data = str(data)
111
112 self.buffer.add(data)
113 self._deliverBuffer()
114
116
117
118
119
120
121
122
123
124
125
126
127 if self.consumer.channel is None:
128 return
129
130 try:
131 self.consumer.write(data)
132 except IOError as ex:
133 self.exceptionToRaise = ex
134 except Exception as ex:
135 base.ui.notifyError("Exception during streamed write.")
136 self.exceptionToRaise = ex
137
139
140 self.exceptionToRaise = None
141 self.join(0.01)
142 if self.isAlive():
143 base.ui.notifyError("Streaming thread couldn't be joined?")
144
145 if self.connectionLive:
146
147
148 if getattr(self.consumer, "channel", None):
149 self.consumer.unregisterProducer()
150 else:
151 self.consumer.producer = None
152
153
154 self.consumer.finish()
155
157 try:
158 try:
159 self.writeStreamTo(self)
160 self.buffer.doneWriting()
161 self._deliverBuffer()
162 except StopWriting:
163 pass
164 except IOError:
165
166
167
168 base.ui.notifyError("I/O Error while streaming:")
169 except:
170 base.ui.notifyError("Exception while streaming"
171 " (closing connection):\n")
172 self.consumer.write("\n\n\nXXXXXX Internal error in DaCHS software.\n"
173 "If you are seeing this, please notify gavo@ari.uni-heidelberg.de\n"
174 "with as many details (like a URL) as possible.\n"
175 "Also, the following traceback may help people there figure out\n"
176 "the problem:\n"+
177 utils.getTracebackAsString())
178
179
180 finally:
181 reactor.callFromThread(self.cleanup)
182
183 synchronized = ['resumeProducing', 'pauseProducing', 'stopProducing']
184
185 threadable.synchronize(DataStreamer)
186
187
189 """sets up the thread to have writeStreamTo write to request from
190 a thread.
191
192 For convenience, this function returns request.deferred, you
193 you can write things like return streamOut(foo, request) in your
194 renderHTTP (or analoguous).
195 """
196 t = DataStreamer(writeStreamTo, request)
197 t.start()
198 return request.deferred
199
200
202 """streams out the payload of an SvcResult as a VOTable.
203 """
204 def writeVOTable(outputFile):
205 """writes a VOTable representation of the SvcResult instance data
206 to request.
207 """
208 if "tablecoding" not in contextOpts:
209 contextOpts["tablecoding"] = {
210 True: "td", False: "binary"}[data.queryMeta["tdEnc"]]
211 if "version" not in contextOpts:
212 contextOpts["version"] = data.queryMeta.get("VOTableVersion")
213
214 votablewrite.writeAsVOTable(
215 data.original, outputFile,
216 ctx=votablewrite.VOTableContext(**contextOpts))
217 return ""
218
219 return streamOut(writeVOTable, request)
220