Package gavo :: Package svcs :: Module streaming
[frames] | no frames]

Source Code for Module gavo.svcs.streaming

  1  """ 
  2  Streaming out large computed things using twisted and threads. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import time 
 12  import threading 
 13   
 14  from twisted.internet import reactor 
 15  from twisted.internet.interfaces import IPushProducer 
 16  from twisted.python import threadable 
 17   
 18  from zope.interface import implements 
 19   
 20  from gavo import base 
 21  from gavo import utils 
 22  from gavo.formats import votablewrite 
 23   
 24   
25 -class StopWriting(IOError):
26 """clients can raise this when they want the stream to abort. 27 """
28 29
30 -class DataStreamer(threading.Thread):
31 """is a twisted-enabled Thread to stream out large files produced 32 on the fly. 33 34 It is basically a push producer. To use it, construct it with 35 a data source and a twisted request (or any IFinishableConsumer) 36 If in a nevow resource, you should then return request.deferred. 37 38 The data source simply is a function writeStreamTo taking one 39 argument; this will be the DataStreamer. You can call its write 40 method to deliver data. There's no need to close anything, just 41 let your function return. 42 43 writeStream will be run in a thread to avoid blocking the reactor. 44 """ 45 # we shouldn't really do this kind of thing, but writing the stuff that 46 # we want to produce asynchonously is typically still a bigger pain. 47 48 implements(IPushProducer) 49
50 - def __init__(self, writeStreamTo, consumer):
51 # we play the renderer for whatever we're running. Hence, we need 52 # to have the HANDLING_HTTPS attribute that used code looks for 53 # upstack to determine whether to make https URLs (yes, I'd use 54 # a different basic design if I had to do this again). 55 self.HANDLING_HTTPS = consumer.isSecure() 56 57 threading.Thread.__init__(self) 58 self.writeStreamTo, self.consumer = writeStreamTo, consumer 59 self.paused, self.exceptionToRaise = False, None 60 consumer.registerProducer(self, True) 61 self.connectionLive = True 62 consumer.notifyFinish().addErrback(self._abortProducing) 63 self.setDaemon(True) # kill transfers on server restart 64 self.buffer = utils.StreamBuffer()
65
66 - def _abortProducing(self, res):
67 # this errback is called when the request is finishing after an error. The 68 # request, here, still is there as self.consumer; this code runs in the 69 # main thread. 70 self.connectionLive = False 71 if getattr(self.consumer, "channel", None): 72 self.consumer.unregisterProducer() 73 self.exceptionToRaise = StopWriting("Client has hung up")
74
75 - def resumeProducing(self):
76 self.paused = False
77
78 - def pauseProducing(self):
79 self.paused = True
80
81 - def stopProducing(self):
82 self.exceptionToRaise = StopWriting("Stop writing, please")
83
84 - def _deliverBuffer(self):
85 """causes the accumulated data to be written if enough 86 data is there. 87 88 This must be called at least once after buffer.doneWriting() 89 as been called. 90 """ 91 while self.connectionLive and not self.exceptionToRaise: 92 data = self.buffer.get() 93 if data is None: # nothing to write yet/any more 94 return 95 while self.paused and not self.exceptionToRaise: 96 # consumer has requested a pause; let's busy-loop; 97 # doesn't cost much and is easier than semaphores. 98 time.sleep(0.1) 99 100 reactor.callFromThread(self._writeToConsumer, data)
101
102 - def write(self, data):
103 """schedules data to be written to the consumer. 104 """ 105 if self.exceptionToRaise: 106 raise self.exceptionToRaise 107 108 # Allow unicode data in as long as it's actually ascii: 109 if isinstance(data, unicode): 110 data = str(data) 111 112 self.buffer.add(data) 113 self._deliverBuffer()
114
115 - def _writeToConsumer(self, data):
116 # We want to catch errors occurring during writes. This method 117 # is called from the reactor (main) thread. 118 # We assign to the exceptionToRaise instance variable, and this 119 # races with stopProducing. This race is harmless, though, since 120 # in any case writing stops, and the exception raised is of secondary 121 # importance. 122 123 # when the remote end has hung up, this can still be called from 124 # twisted's belly, but the consumer's channel will be None by then. 125 # I don't want ugly mess in the logs in that case, so I catch this 126 # case, hoping I don't ignore any actual problems in this way: 127 if self.consumer.channel is None: 128 return 129 130 try: 131 self.consumer.write(data) 132 except IOError as ex: 133 self.exceptionToRaise = ex 134 except Exception as ex: 135 base.ui.notifyError("Exception during streamed write.") 136 self.exceptionToRaise = ex
137
138 - def cleanup(self, result=None):
139 # Must be callFromThread'ed 140 self.exceptionToRaise = None 141 self.join(0.01) 142 if self.isAlive(): 143 base.ui.notifyError("Streaming thread couldn't be joined?") 144 145 if self.connectionLive: 146 # I have no idea why channel would be None here with a live conn, 147 # but it happens on stretch, so let's protect unregisterProducer. 148 if getattr(self.consumer, "channel", None): 149 self.consumer.unregisterProducer() 150 else: 151 self.consumer.producer = None 152 # Clean up the request; don't make this conditional, as 153 # that would prevent important callbacks from running 154 self.consumer.finish()
155
156 - def run(self):
157 try: 158 try: 159 self.writeStreamTo(self) 160 self.buffer.doneWriting() 161 self._deliverBuffer() 162 except StopWriting: 163 pass 164 except IOError: 165 # I/O errors are most likely not our fault, and I don't want 166 # to make matters worse by pushing any dumps into a line 167 # that's probably closed anyway. 168 base.ui.notifyError("I/O Error while streaming:") 169 except: 170 base.ui.notifyError("Exception while streaming" 171 " (closing connection):\n") 172 self.consumer.write("\n\n\nXXXXXX Internal error in DaCHS software.\n" 173 "If you are seeing this, please notify gavo@ari.uni-heidelberg.de\n" 174 "with as many details (like a URL) as possible.\n" 175 "Also, the following traceback may help people there figure out\n" 176 "the problem:\n"+ 177 utils.getTracebackAsString()) 178 # All producing is done in the thread, so when no one's writing any 179 # more, we should have delivered everything to the consumer 180 finally: 181 reactor.callFromThread(self.cleanup)
182 183 synchronized = ['resumeProducing', 'pauseProducing', 'stopProducing']
184 185 threadable.synchronize(DataStreamer) 186 187
188 -def streamOut(writeStreamTo, request):
189 """sets up the thread to have writeStreamTo write to request from 190 a thread. 191 192 For convenience, this function returns request.deferred, you 193 you can write things like return streamOut(foo, request) in your 194 renderHTTP (or analoguous). 195 """ 196 t = DataStreamer(writeStreamTo, request) 197 t.start() 198 return request.deferred
199 200
201 -def streamVOTable(request, data, **contextOpts):
202 """streams out the payload of an SvcResult as a VOTable. 203 """ 204 def writeVOTable(outputFile): 205 """writes a VOTable representation of the SvcResult instance data 206 to request. 207 """ 208 if "tablecoding" not in contextOpts: 209 contextOpts["tablecoding"] = { 210 True: "td", False: "binary"}[data.queryMeta["tdEnc"]] 211 if "version" not in contextOpts: 212 contextOpts["version"] = data.queryMeta.get("VOTableVersion") 213 214 votablewrite.writeAsVOTable( 215 data.original, outputFile, 216 ctx=votablewrite.VOTableContext(**contextOpts)) 217 return ""
218 219 return streamOut(writeVOTable, request) 220