Source code for gavo.protocols.uwsactions

Manipulating UWS jobs through a REST interface.

The result documents are defined through the schema uws-1.0.xsd.

Instead of returning XML, they can also raise svcs.SeeOther exceptions.
However, these are caught in JobResource._redirectAsNecessary and appended
to the base URL auf the TAP service, so you must only give URIs relative
to the TAP service's root URL.

This UWS system should adapt to concrete UWSes; the UWS in use is passed
into the top-level functions (doJobAction , getJobList).

The actions pretty much resemble twisted.web Resources; perhaps this
should be refactored to use them.  For now, however, we don't pass around 
resources, we pass around callables that are used within renderers
in asyncrender, which lets asyncrender factor out a bit of common
functionality.  Hm.

#c Copyright 2008-2022, the GAVO project <>
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import os

from twisted.internet import reactor
from twisted.internet import defer
from twisted.web import server
from twisted.web import static

from gavo import base
from gavo import svcs
from gavo import utils
from gavo.protocols import dali
from gavo.protocols import uws
from gavo.utils import stanxml

UWSNamespace = ''
XlinkNamespace = ""
stanxml.registerPrefix("uws", UWSNamespace,
stanxml.registerPrefix("xlink", XlinkNamespace,

[docs]class UWS(object): """the container for elements from the uws namespace. """
[docs] class UWSElement(stanxml.Element): _prefix = "uws"
[docs] @staticmethod def makeRoot(ob): ob._additionalPrefixes = stanxml.xsiPrefix ob._mayBeEmpty = True return ob
[docs] class job(UWSElement): _a_version = "1.1"
[docs] class jobs(UWSElement): _mayBeEmpty = True _a_version = "1.1"
[docs] class parameters(UWSElement): pass
[docs] class destruction(UWSElement): pass
[docs] class endTime(stanxml.NillableMixin, UWSElement): pass
[docs] class creationTime(UWSElement): pass
[docs] class executionDuration(UWSElement): pass
[docs] class jobId(UWSElement): pass
[docs] class jobInfo(UWSElement): pass
[docs] class message(UWSElement): pass
[docs] class ownerId(stanxml.NillableMixin, UWSElement): pass
[docs] class phase(UWSElement): pass
[docs] class quote(stanxml.NillableMixin, UWSElement): pass
[docs] class runId(UWSElement): pass
[docs] class startTime(stanxml.NillableMixin, UWSElement): pass
[docs] class detail(UWSElement): _a_href = None _a_type = None _name_a_href = "xlink:href" _name_a_type = "xlink:type"
[docs] class errorSummary(UWSElement): _a_type = None # transient | fatal _a_hasDetail = None
[docs] class jobref(UWSElement): _additionalPrefixes = frozenset(["xlink"]) _a_id = None _a_href = None _a_type = None _name_a_href = "xlink:href" _name_a_type = "xlink:type"
[docs] class parameter(UWSElement): _mayBeEmpty = True _a_byReference = None _a_id = None _a_isPost = None
[docs] class result(UWSElement): _additionalPrefixes = frozenset(["xlink"]) _mayBeEmpty = True _a_id = None _a_href = None _a_type = None _name_a_href = "xlink:href" _name_a_type = "xlink:type"
[docs] class results(UWSElement): _mayBeEmpty = True
[docs]def getJobList(workerSystem, forOwner=None, phase=None, last=None, after=None): result = for jobId, phase in workerSystem.getIdsAndPhases( forOwner, phase, last, after): result[ UWS.jobref(id=jobId, href=workerSystem.getURLForId(jobId))[ UWS.phase[phase]]] return stanxml.xmlrender(result, workerSystem.joblistPreamble)
[docs]def getErrorSummary(job): # all our errors are fatal, and for now .../error yields the same thing # as we include here, so we hardcode the attributes. errDesc = job.error if not errDesc: return None msg = errDesc["msg"] if errDesc["hint"]: msg = msg+"\n\n -- Hint: "+errDesc["hint"] return UWS.errorSummary(type="fatal", hasDetail="false")[ UWS.message[msg]]
[docs]def getParametersElement(job): """returns a UWS.parameters element for job. """ res = UWS.parameters() for key, value in job.iterSerializedPars(): if isinstance(value, uws.ParameterRef): res[UWS.parameter(id=key, byReference=True)[value.url]] else: res[UWS.parameter(id=key)[str(value)]] return res
[docs]class JobActions(object): """A collection of "actions" performed on UWS jobs. Their names are the names of the child resources of UWS jobs. The basic UWS actions are built in. When constructing those, you can pass in as many additional JobAction subclasses as you want. Set their names to one of UWS standard actions to override UWS behaviour if you think that's wise. """ _standardActions = {} def __init__(self, *additionalActions): self.actions = {} self.actions.update(self._standardActions) for actionClass in additionalActions: self.actions[] = actionClass()
[docs] @classmethod def addStandardAction(cls, actionClass): cls._standardActions[] = actionClass()
[docs] def dispatch(self, action, job, request, segments): if job.owner: if request.getAuthUser()!=job.owner: raise svcs.Authenticate() try: resFactory = self.actions[action] except KeyError: raise base.ui.logOldExc( svcs.UnknownURI("Invalid UWS action '%s'"%action)) request.setHeader("content-type", resFactory.mime) return resFactory.getResource(job, request, segments)
[docs]class JobAction(object): """an action done to a job. It defines methods do<METHOD> that are dispatched through JobActions. It must have a name corresponding to the child resource names from the UWS spec. """ name = None mime = "text/xml"
[docs] def getResource(self, job, request, segments): if segments: raise svcs.UnknownURI("Too many segments") try: handler = getattr(self, "do"+request.method.decode("ascii")) except AttributeError: raise base.ui.logOldExc(svcs.BadMethod(request.method)) return handler(job, request)
[docs]class ErrorAction(JobAction): name = "error" mime = "text/plain"
[docs] def doGET(self, job, request): if job.error is None: return b"" return dali.serveDALIError(request, job.error)
doPOST = doGET
[docs]class StartTimeAction(JobAction): # This an extension over plain UWS allowing users to retrieve when # their job started. In the DaCHS' TAP implementation, this lets # you discern whether the taprunner is already processing an EXECUTING # job (startTime!=NULL) or whether it's still coming up (else) name = "startTime" mime = "text/plain"
[docs] def doGET(self, job, request): if job.startTime is None: return b"NULL" else: return utils.formatISODT(job.startTime).encode("ascii")
doPOST = doGET
[docs]class ParameterAction(JobAction): name = "parameters"
[docs] def doGET(self, job, request): request.setHeader("content-type", "text/xml") return UWS.makeRoot(getParametersElement(job))
[docs] def doPOST(self, job, request): if job.phase!=uws.PENDING: raise base.ValidationError( "Parameters cannot be changed in phase %s"%job.phase, "phase") with job.getWritable() as wjob: wjob.setParamsFromRawDict(request.strargs) raise svcs.SeeOther(job.jobId)
[docs]class PhaseAction(JobAction): name = "phase" mime = "text/plain" timeout = 10 # this is here for testing
[docs] def doPOST(self, job, request): newPhase = request.uwsArgs["phase"] if newPhase=="RUN": job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout) elif newPhase=="ABORT": job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout) else: raise base.ValidationError("Bad phase: %s"%newPhase, "phase") raise svcs.SeeOther(job.jobId)
[docs] def doGET(self, job, request): request.setHeader("content-type", "text/plain") return job.phase
JobActions.addStandardAction(PhaseAction) class _SettableAction(JobAction): """Abstract base for ExecDAction and DestructionAction. """ mime = "text/plain" def doPOST(self, job, request): raw = request.uwsArgs[] if raw is None: # with no parameter, fall back to GET return self.doGET(job, request) try: val = self.deserializeValue(raw) except ValueError: raise base.ui.logOldExc(uws.UWSError("Invalid %s value: %s."%(, repr(raw)), job.jobId)) with job.getWritable() as wjob: args = {self.attName: val} wjob.change(**args) raise svcs.SeeOther(job.jobId) def doGET(self, job, request): request.setHeader("content-type", "text/plain") return self.serializeValue(getattr(job, self.attName))
[docs]class ExecDAction(_SettableAction): name = "executionduration" attName = 'executionDuration' serializeValue = str deserializeValue = float
[docs]class DestructionAction(_SettableAction): name = "destruction" attName = "destructionTime" serializeValue = staticmethod(utils.formatISODT) deserializeValue = staticmethod(utils.parseISODT)
[docs]class QuoteAction(JobAction): name = "quote" mime = "text/plain"
[docs] def doGET(self, job, request): request.setHeader("content-type", "text/plain") if job.quote is None: quote = "" else: quote = utils.formatISODT(job.quote) return quote
[docs]class OwnerAction(JobAction): name = "owner" mime = "text/plain"
[docs] def doGET(self, job, request): request.setHeader("content-type", "text/plain") if job.owner is None: request.write("NULL") else: request.write(job.owner) return b""
JobActions.addStandardAction(OwnerAction) def _getResultsElement(job): baseURL = job.getURL()+"/results/" return UWS.results[[ UWS.result(id=res["resultName"], href=baseURL+res["resultName"]) for res in job.getResults()]]
[docs]class ResultsAction(JobAction): """Access result (Extension: and other) files in job directory. """ name = "results"
[docs] def getResource(self, job, request, segments): if not segments: return JobAction.getResource(self, job, request, segments) res = None # first try a "real" UWS result from the job if len(segments)==1: try: fName, resultType = job.getResult(segments[0]) res = static.File(fName, defaultType=str(resultType)) res.encoding = None except base.NotFoundError: # segments[0] does not name a result pass # fall through to other files if res is None: # if that doesn't work, try to return some other file from the # job directory. This is so we can deliver uploads. filePath = os.path.join(job.getWD(), *segments) if not os.path.abspath(filePath).startswith( os.path.abspath(job.getWD())): raise svcs.ForbiddenURI("Not serving files outside of job directory.") if not os.path.exists(filePath): raise svcs.UnknownURI("File not found") res = static.File(filePath, defaultType="application/octet-stream") # now render whatever we have found res.render(request) return server.NOT_DONE_YET
[docs] def doGET(self, job, request): return _getResultsElement(job)
JobActions.addStandardAction(ResultsAction) def _serializeTime(element, dt): if dt is None: return element() return element[utils.formatISODT(dt)]
[docs]class RootAction(JobAction): """Actions for async/jobId. """ name = ""
[docs] def doDELETE(self, job, request): """Implements DELETE on a job resource. This is the UWS-compliant way to delete a job. """ job.uws.destroy(job.jobId) raise svcs.SeeOther("")
[docs] def doPOST(self, wjob, request): """Implements POST on a job resource. This is a DaCHS extension to UWS in order to let web browsers delete jobs by passing action=DELETE. """ if request.uwsArgs.get("action")=="DELETE": self.doDELETE(wjob, request) else: raise svcs.BadMethod("POST")
def _stopWaiting(self, result, request): """a callback for notifyFinish that will stop our polling loop. """ request._REMOTE_HUNG_UP = True return result def _delayIfWAIT(self, job, request): """delays if request has UWS 1.1 "slow poll" arguments, returns None otherwise. This is a helper for doGET. """ # This is the implementation of UWS 1.1 "slow polling". # We still do polling internally rather than use postgres' # LISTEN/NOTIFY since the overhead seems rather moderate and # the added complexity of setting up notifications appeared not # proportional to saving it. args = request.uwsArgs if args.get("wait") is None: return waitFor = int(args["wait"]) if waitFor==-1: waitFor = base.getConfig("async", "maxslowpollwait") if args.get("phase") is not None and args["phase"]!=job.phase: return if job.phase not in ["PENDING", "QUEUED", "EXECUTING"]: return d = defer.Deferred().addCallback( self._recheck, job.uws, job.jobId, waitFor-1, job.phase) reactor.callLater(1, d.callback, request) request.notifyFinish().addBoth(self._stopWaiting, request) return d def _recheck(self, request, workerSystem, jobId, remainingWait, originalPhase): """the callback for doing slow polls. """ if hasattr(request, "_REMOTE_HUNG_UP"): # the other side has given up on us, all is done; return # NOT_DONE_YET to keep twisted from finishing the request again. return server.NOT_DONE_YET job = workerSystem.getJob(jobId) if originalPhase!=job.phase or remainingWait<=0: request.uwsArgs.pop("wait", None) request.uwsArgs.pop("phase", None) return self.doGET(job, request) d = defer.Deferred().addCallback( self._recheck, workerSystem, jobId, remainingWait-1, originalPhase) reactor.callLater(1, d.callback, request) return d
[docs] def doGET(self, job, request): """Implements GET on a job resource: return the current job metadata. """ delay = self._delayIfWAIT(job, request) if delay: return delay request.setHeader("content-type", "text/xml") tree = UWS.makeRoot(UWS.job[ UWS.jobId[job.jobId], UWS.runId[job.runId], UWS.ownerId[job.owner], UWS.phase[job.phase], UWS.quote[utils.formatISODT(job.quote)], UWS.creationTime[utils.formatISODT(job.creationTime)], _serializeTime(UWS.startTime, job.startTime), _serializeTime(UWS.endTime, job.endTime), UWS.executionDuration[str(job.executionDuration)], UWS.destruction[utils.formatISODT(job.destructionTime)], getParametersElement(job), _getResultsElement(job), getErrorSummary(job)]) return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
[docs]def doJobAction(workerSystem, request, segments): """handles the async UI of UWS. Depending on method and segments, it will return various XML strings and may cause certain actions. Segments must be a tuple with at least one element, the job id. """ jobId, segments = segments[0], segments[1:] if not segments: action = "" else: action, segments = segments[0], segments[1:] return workerSystem.jobActions.dispatch(action, workerSystem.getJob(jobId), request, segments)