Source code for gavo.web.asyncrender

"""
Renderers and helpers for asynchronous services.

For TAP (which was the first prototype of these), there's a separate
module using some of this; on the long run, it should probably be
integrated here.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools

from twisted.internet import defer
from twisted.python import failure
from twisted.web import resource
from twisted.web import server

from gavo import base
from gavo import svcs
from gavo import utils
from gavo.protocols import dali
from gavo.protocols import dlasync
from gavo.protocols import uws
from gavo.protocols import uwsactions
from gavo.web import grend
from gavo.web import weberrors


[docs]def redirectUWS(baseURL, location): """raises an UWS-compatible (303) redirection. baseURL and location and then just raise svc.Found The locations used here are relative to baseURL, which essentially has to be the the full absolute URL of the endpoint (i.e., service/renderer). As a special service, for TAP async is being added as long as the renderer isn't fixed to not do dispatching. This essentially just mogrifies SeeOther exceptions rendered elsewhere. It should therefore go at some point (probably when TAP uses the async renderer). """ # TODO: Temporary hack as long as TAP isn't modernized to use # an async renderer: fix the redirect to TAP's async endpoint if # baseURL is the TAP renderer: if baseURL.endswith("tap"): baseURL = baseURL+"/async" if location: if location.startswith("http://") or location.startswith("https://"): location = str(location) else: location = str("%s/%s"%(baseURL, location)) else: location = str(baseURL) raise svcs.SeeOther(location)
[docs]class UWSResource(resource.Resource): """a resource dealing with a UWS. It is constructed with a worker system, a concrete renderer, and the service that executes requests. It also delivers errors in UWS (well, TAP, actually) style. """ def __init__(self, workerSystem, renderer, service): self.workerSystem, self.service = workerSystem, service self.renderer = renderer resource.Resource.__init__(self) def _deliverError(self, flr, request, httpCode=200): # auth requests and redirects handled by normal dc methods. if isinstance(flr.value, (svcs.Authenticate, svcs.SeeOther)): return weberrors.renderDCErrorPage(flr, request) else: if isinstance(flr.value, uws.JobNotFound): httpCode = 404 else: base.ui.notifyFailure(flr) return dali.serveDALIError(request, flr.value, httpCode)
[docs] def render(self, request): try: return resource.Resource.render(self, request) except Exception as ex: return self._deliverError(failure.Failure(ex), request, httpCode=400)
[docs]class JoblistResource(UWSResource): """The web resource corresponding to async root. GET yields a job list, POST creates a job. There's an extra hack not in UWS: if get with something like dachs_authenticate=anything and haven't passed a user, this will ask for credentials. """
[docs] @functools.lru_cache(1) def getJoblistInputTD(self): return base.parseFromString(svcs.InputTD, """ <inputTable> <inputKey name="PHASE" type="text" multiplicity="single" description="Restrict result to jobs in this phase"> <values> <option>PENDING</option> <option>QUEUED</option> <option>EXECUTING</option> <option>COMPLETED</option> <option>ERROR</option> <option>ABORTED</option> <option>UNKNOWN</option> <option>HELD</option> <option>SUSPENDED</option> <option>ARCHIVED</option> </values> </inputKey> <inputKey name="AFTER" type="timestamp" multiplicity="single" description="Restrict result to jobs created after this point in time"/> <inputKey name="LAST" type="integer" multiplicity="single" description="Restrict output to this many records, and choose the most recent ones"/> </inputTable>""")
[docs] def render_GET(self, request): if "dachs_authenticate" in request.strargs and not request.getUser(): raise svcs.Authenticate() request.setHeader("content-type", "text/xml") args = svcs.CoreArgs.fromRawArgs( self.getJoblistInputTD(), request.strargs).args res = uwsactions.getJobList(self.workerSystem, request.getAuthUser() or None, phase=args["PHASE"], last=args["LAST"], after=args["AFTER"]) return res
[docs] def render_POST(self, request): jobId = self.workerSystem.getNewIdFromRequest(request) redirectUWS(self.service.getURL(self.renderer), str(jobId))
def _deliverResult(self, res, request): request.setHeader("content-type", "text/xml") return res
[docs]class JobResource(UWSResource): """The web resource corresponding to async requests for jobs. This currently uses a custom hack for resource resolution and method dispatch. Let's move it to using twisted resources one day. """ def __init__(self, workerSystem, renderer, service, segments): self.service, self.segments = service, segments self.workerSystem, self.renderer = workerSystem, renderer
[docs] def render(self, request): defer.maybeDeferred( uwsactions.doJobAction, self.workerSystem, request, self.segments ).addCallback(self._deliverResult, request ).addErrback(self._redirectAsNecessary, request ).addErrback(self._deliverError, request) return server.NOT_DONE_YET
def _redirectAsNecessary(self, flr, request): flr.trap(svcs.SeeOther) redirectUWS(self.service.getURL(self.renderer), flr.value.rawDest) def _deliverResult(self, result, request): if result is server.NOT_DONE_YET: # the job action is rendering itself -- this is where we'd like # to go for non-XML replies. return result elif isinstance(result, resource.Resource): # nevow-style returned resource -- we'd like to get rid of that base.ui.notifyWarning("UWS job resource returned a resource rather than" " rendering itself. We'd like to stop this.") return result.render(request) else: # convenience function: result must be a stan tree we can just # render. the content-type is set by uwsaction._JobActions.dispatch request.write(utils.xmlrender(result)) request.finish()
[docs]def getAsyncResource( request, workerSystem, renderer, service, firstSegment): """returns a UWS-compliant resource for request. Note: This expects that the renderer has already called uws.prepareRequest. """ segments = request.popSegments(firstSegment) if segments==[""]: # redirect async/ to async so our style sheets work raise svcs.Found(request.uri[:-1]) elif not segments: return JoblistResource(workerSystem, renderer, service) else: return JobResource(workerSystem, renderer, service, segments)
[docs]class AsyncRendererBase(grend.ServiceBasedPage): """An abstract renderer for things running in a UWS. To make these concrete, they need a name and a workerSystem attribute. """ parameterStyle = "pql"
[docs] def render(self, request): # We don't do anything ourselves -- everything has to go through # getAsyncResource and hence getChild try: return self.getChild(None, request).render(request) except Exception as ex: base.ui.notifyError(f"UWS root render failed: {ex}")
[docs] def getChild(self, name, request): from gavo.web import asyncrender if request.prepath[-1]==b"": # trailing slash: redirect away so our XSLT works properly raise svcs.Found(b"/".join(request.prepath[:-1])) try: uws.prepareRequest(request, self.service) return asyncrender.getAsyncResource(request, self.workerSystem, self.name, self.service, name) except Exception as ex: base.ui.notifyError(f"UWS child construction failed: {ex}") return dali.DALIErrorResource(ex)
[docs]class DatalinkAsyncRenderer(AsyncRendererBase): """A renderer for asynchronous datalink. """ # we need a special case here because this needs to put the # id of the calling service into strargs. Note that this # cannot do any uploads in this form, as we are deferring all # inspection of the arguments to when the job actually runs. name = "dlasync" workerSystem = dlasync.DL_WORKER
[docs] def render(self, request): # This sort of parameter re-packing quite certainly isn't what # we should be doing on the long run. Let's see what we come # up with the next time we revisit this. request._strargs = { "dlargs": request.strargs, "serviceid": [self.service.getFullId()]} return AsyncRendererBase.render(self, request)
[docs]class DALIAsyncRenderer(AsyncRendererBase): """A renderer speaking UWS. This is for asynchronous execution of larger jobs. This is what is executed by the async renderer. It requests the worker system required from the service, which in turn obtains it from the core; these must hence cooperate with this to allow async operation. See `Custom UWSes`_ for how to use this with your own cores. """ name = "async" aliases = frozenset(["uws.xml"]) @property def workerSystem(self): return self.service.getUWS()