Source code for gavo.protocols.dlasync

"""
A UWS-based interface to datalink.

TODO: There's quite a bit of parallel between this and useruws.  This
should probably be reformulated along the lines of useruws.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import datetime

from twisted.web import server

from gavo import base
from gavo import svcs
from gavo import rscdesc #noflake: cache registration
from gavo.protocols import products
from gavo.protocols import uws
from gavo.protocols import uwsactions

# TODO: We're not supposed to import from helpers in main code.  Fix
# this.
from gavo.helpers import testtricks


[docs]class DLTransitions(uws.ProcessBasedUWSTransitions): """The transition function for datalink jobs. """ def __init__(self): uws.ProcessBasedUWSTransitions.__init__(self, "DL")
[docs] def queueJob(self, newState, wjob, ignored): uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) return self.startJob(uws.EXECUTING, wjob, ignored)
[docs] def getCommandLine(self, wjob): return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]
[docs]class DLJob(uws.UWSJobWithWD): """a UWS job performing some datalink data preparation. In addition to UWS parameters, it has * serviceid -- the fully qualified id of the service that will process the request * datalinkargs -- the parameters (in request.args form) of the datalink request. """ _jobsTDId = "//datalink#datalinkjobs" _transitions = DLTransitions()
[docs]class DLUWS(uws.UWS): """the worker system for datalink jobs. """ joblistPreamble = ("<?xml-stylesheet href='/static" "/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>") jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" "dlasync-job-to-html.xsl' type='text/xsl'?>") _baseURLCache = None def __init__(self): uws.UWS.__init__(self, DLJob, uwsactions.JobActions()) @property def baseURL(self): return base.makeAbsoluteURL("datalinkuws") @functools.cached_property def parameterGrammar(self): return base.parseFromString(svcs.ContextGrammar, """<contextGrammar> <inputKey name="dlargs" type="raw" description="All datalink parameters. These will only be validated on execution. See the datalink descriptor for the actual parameters usable here. You cannot pass this in." multiplicity="forced-single"/> <inputKey name="serviceid" type="text" description="The id of the service processing this. You cannot pass this in; it is overridden by the renderer." multiplicity="forced-single"/> </contextGrammar>""")
[docs] def getURLForId(self, jobId): """returns a fully qualified URL for the job with jobId. """ return "%s/%s"%(self.baseURL, jobId)
DL_WORKER = DLUWS() ####################### twisted.web simulation # This is so we can handle t.w resources coming back from datalink. # Factor this out? This is essentially stolen from trialhelpers, # and we might just put that somewhere where it's useful. import warnings from twisted.internet import defer from twisted.internet import reactor def _requestDone(result, request): """essentially calls render on result and stops the reactor. This is a helper for our t.w simulation. """ if isinstance(result, str): if result: request.write(result) else: warnings.warn("Unsupported async datalink render result: %s"%repr(result)) request.deferred.callback(request.accumulator) reactor.stop() return request.accumulator, request
[docs]class WritingFakeRequest(testtricks.FakeRequest): """a simulator for actual t.w requests. We want this here as we're rendering to a UWS result file with the same code that renders to web requests. One could probably go a lot simpler than testtricks.FakeRequest, but since the code is there anyway, it probably doesn't hurt to use it in case we want to do fancier things in the future. The one thing I have to change vs. testtricks is that we want to write to files. """ def __init__(self, destFile): self.destFile = destFile testtricks.FakeRequest.__init__(self, "")
[docs] def write(self, stuff): self.destFile.write(stuff)
[docs] def finish(self): self.deferred.callback(None) self.destFile.close() reactor.stop()
[docs]def writeResultTo(page, destFile): """arranges for the result of rendering the twisted.web resource to be written to destFile. This uses a very simple simulation of t.w rendering, so a few tricks are possible. Also, it actually runs a reactor to do its magic. Do not run this in a running DaCHS server; it has its own reactor running. This is only for dachs dlrun code. TODO: There's proabably code for this in t.w. """ def _(func, req): try: res = func(req) except Exception: request.finish() raise if res==server.NOT_DONE_YET: # resource will finish the request itself later return try: if res: req.write(res) finally: req.finish() request = WritingFakeRequest(destFile) reactor.callWhenRunning(_, page.render, request) reactor.run() return request
####################### CLI
[docs]def parseCommandLine(): import argparse parser = argparse.ArgumentParser(description="Run an asynchronous datalink" " job (used internally)") parser.add_argument("jobId", type=str, help="UWS id of the job to run") return parser.parse_args()
[docs]def main(): args = parseCommandLine() jobId = args.jobId try: job = DL_WORKER.getJob(jobId) with job.getWritable() as wjob: wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow()) args = job.parameters["dlargs"] service = base.resolveCrossId(job.parameters["serviceid"]) data = service.run("dlget", args, svcs.emptyQueryMeta) # Unfortunately, datalink cores can in principle return all kinds # of messy things that may not even be representable in plain files # (e.g., t.w resources returning redirects). We hence only # handle (mime, payload) and (certain) Product instances here # and error out otherwise. if isinstance(data, tuple): mime, payload = data with job.openResult(mime, "result") as destF: destF.write(payload) elif isinstance(data, products.ProductBase): # We could run render and grab the content-type from there # (which probably would be better all around). For now, don't # care: with job.openResult("application/octet-stream", "result") as destF: for chunk in data.iterData(): destF.write(chunk) elif hasattr(data, "render"): # these are t.w. resources. Let's run a reactor so these properly # work. with job.openResult(type, "result") as destF: req = writeResultTo(data, destF) job.fixTypeForResultName("result", req.responseHeaders.getRawHeaders("content-type")[0]) else: raise NotImplementedError("Cannot handle a service %s result yet."% repr(data)) with job.getWritable() as wjob: wjob.change(phase=uws.COMPLETED) except SystemExit: pass except uws.JobNotFound: base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId) except Exception as ex: base.ui.notifyError("Datalink runner %s major failure"%jobId) # try to push job into the error state -- this may well fail given # that we're quite hosed, but it's worth the try DL_WORKER.changeToPhase(jobId, uws.ERROR, ex) raise
if __name__=="__main__": # pagma: no cover # silly test code, not normally reached from twisted.web import resource import os class _Foo(resource.Resource): def __init__(self, stuff): self.stuff = stuff def render(self, request): if self.stuff=="booga": return b"abc" else: return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup) def cleanup(self, res): print("cleaning up") return res with open("bla", "w") as f: writeResultTo(_Foo("ork"), f) with open("bla") as f: print(f.read()) os.unlink("bla")