Source code for gavo.svcs.streaming

"""
Streaming out large computed things using twisted and threads.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import time
import threading

from twisted.internet import reactor
from twisted.internet.interfaces import IPushProducer
from twisted.python import threadable
from twisted.web import server

from zope.interface import implementer

from gavo import base
from gavo import utils
from gavo.formats import votablewrite


[docs]class StopWriting(IOError): """clients can raise this when they want the stream to abort. """
[docs]@implementer(IPushProducer) class DataStreamer(threading.Thread): """is a twisted-enabled Thread to stream out large files produced on the fly. It is basically a push producer. To use it, construct it with a data source and a twisted request (or any IFinishableConsumer) If in a twisted resource, you should arrange a deferred and return NOT_DONE_YET; really, just use streamOut below. The data source simply is a function writeStreamTo taking one argument, which is file-like (i.e., use its write method to deliver data). There's no need to close anything, just let your function return. writeStream will be run in a thread to avoid blocking the reactor. """ # we shouldn't really do this kind of thing, but writing the stuff that # we want to produce asynchronously is typically still a bigger pain. def __init__(self, writeStreamTo, consumer, queryMeta=None): threading.Thread.__init__(self) self.writeStreamTo, self.consumer = writeStreamTo, consumer self.queryMeta = queryMeta self.paused, self.exceptionToRaise = False, None self.consumer.registerProducer(self, True) self.connectionLive = True self.consumer.notifyFinish().addErrback(self._abortProducing) self.setDaemon(True) # kill transfers on server restart self.buffer = utils.StreamBuffer() def _abortProducing(self, res): # this errback is called when the request is finishing after an error. The # request, here, still is there as self.consumer; this code runs in the # main thread. self.connectionLive = False if getattr(self.consumer, "channel", None): self.consumer.unregisterProducer() self.exceptionToRaise = StopWriting("Client has hung up")
[docs] def resumeProducing(self): self.paused = False
[docs] def pauseProducing(self): self.paused = True
[docs] def stopProducing(self): self.exceptionToRaise = StopWriting("Stop writing, please")
def _deliverBuffer(self): """causes the accumulated data to be written if enough data is there. This must be called at least once after buffer.doneWriting() as been called. """ while self.connectionLive and not self.exceptionToRaise: data = self.buffer.get() if data is None: # nothing to write yet/any more return while self.paused and not self.exceptionToRaise: # consumer has requested a pause; let's busy-loop; # doesn't cost much and is easier than semaphores. time.sleep(0.1) reactor.callFromThread(self._writeToConsumer, data)
[docs] def write(self, data): """schedules data to be written to the consumer. """ if self.exceptionToRaise: raise self.exceptionToRaise # Allow unicode data in as long as it's actually ascii: if isinstance(data, str): data = data.encode("ascii") self.buffer.add(data) self._deliverBuffer()
def _writeToConsumer(self, data): # We want to catch errors occurring during writes. This method # is called from the reactor (main) thread. # We assign to the exceptionToRaise instance variable, and this # races with stopProducing. This race is harmless, though, since # in any case writing stops, and the exception raised is of secondary # importance. # when the remote end has hung up, this can still be called from # twisted's belly, but the consumer's channel will be None by then. # I don't want ugly mess in the logs in that case, so I catch this # case, hoping I don't ignore any actual problems in this way: if self.consumer.channel is None: return try: self.consumer.write(data) except IOError as ex: self.exceptionToRaise = ex except Exception as ex: base.ui.notifyError("Exception during streamed write.") self.exceptionToRaise = ex
[docs] def cleanup(self, result=None): # Must be callFromThread'ed self.exceptionToRaise = None self.join(0.01) if self.is_alive(): base.ui.notifyError("Streaming thread couldn't be joined?") if self.connectionLive: # I have no idea why channel would be None here with a live conn, # but it happens on stretch, so let's protect unregisterProducer. if getattr(self.consumer, "channel", None): self.consumer.unregisterProducer() else: self.consumer.producer = None # Clean up the request; don't make this conditional, as # that would prevent important callbacks from running self.consumer.finish()
[docs] def run(self): # the request variable is stolen for http/https discrimination # far downstack request = self.consumer #noflake: for stealing try: try: self.writeStreamTo(self) self.buffer.doneWriting() self._deliverBuffer() except StopWriting: pass except IOError: # I/O errors are most likely not our fault, and I don't want # to make matters worse by pushing any dumps into a line # that's probably closed anyway. base.ui.notifyError("I/O Error while streaming:") except: base.ui.notifyError("Exception while streaming" " (closing connection):\n") self.consumer.write("\n\n\nXXXXXX Internal error in DaCHS software.\n" "If you are seeing this, please notify gavo@ari.uni-heidelberg.de\n" "with as many details (like a URL) as possible.\n" "Also, the following traceback may help people there figure out\n" "the problem:\n"+ utils.getTracebackAsString()) # All producing is done in the thread, so when no one's writing any # more, we should have delivered everything to the consumer finally: reactor.callFromThread(self.cleanup)
synchronized = ['resumeProducing', 'pauseProducing', 'stopProducing']
threadable.synchronize(DataStreamer)
[docs]def streamOut(writeStreamTo, request, queryMeta=None): """sets up the thread to have writeStreamTo write to request from a thread. For convenience, this function returns server.NOT_DONE_YET, so in a t.w render method you can just say ``return streamOut()``. We manage all necessary callbacks and errback ourselves. """ if request.channel is None: base.ui.notifyWarning("Not streaming to a dead connection.") return t = DataStreamer(writeStreamTo, request, queryMeta) t.start() return server.NOT_DONE_YET
[docs]def streamVOTable(request, data, queryMeta=None, **contextOpts): """streams out the payload of a data item as a VOTable. """ def writeVOTable(outputFile): """writes a VOTable representation of a data item or table data to request. """ if queryMeta is not None: if "tablecoding" not in contextOpts: contextOpts["tablecoding"] = { True: "td", False: "binary"}[queryMeta["tdEnc"]] if "version" not in contextOpts: contextOpts["version"] = queryMeta.get("VOTableVersion") votablewrite.writeAsVOTable( data, outputFile, ctx=votablewrite.VOTableContext(**contextOpts)) return "" return streamOut(writeVOTable, request)