Source code for gavo.protocols.useruws

"""
Support for UWSes defined in user RDs.

To understand this, start at makeUWSForService.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import weakref

from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: for registration
from gavo import svcs
from gavo.protocols import uws
from gavo.protocols import uwsactions


[docs]class UserUWSTransitions(uws.ProcessBasedUWSTransitions): """The transition function for user-defined UWSes. """ def __init__(self): uws.ProcessBasedUWSTransitions.__init__(self, "User")
[docs] def queueJob(self, newState, wjob, ignored): """puts a job on the queue. """ uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) wjob.uws.scheduleProcessQueueCheck()
[docs] def getCommandLine(self, wjob): args = ["dachs", "uwsrun", "--", str(wjob.jobId)] if base.DEBUG: args[1:1] = ["--debug", "--traceback"] return "gavo", args
[docs]class UserUWSJobBase(uws.UWSJobWithWD): """The base class for the service-specific user UWS jobs. (i.e., the things that the UserUWSJobFactory spits out) """ _transitions = UserUWSTransitions() _jobsTDId = "//uws#userjobs"
[docs]def makeUserUWSJobClass(service): """returns a class object for representing UWS jobs processing requests for service """ class UserUWSJob(UserUWSJobBase): pass UserUWSJob._default_jobClass = classmethod( lambda _, v=service.getFullId(): v) return UserUWSJob
[docs]class UserUWS(uws.UWSWithQueueing): """A UWS for "user jobs", i.e., generic things an a core. These dynamically create job classes based on the processing core's parameters. To make this happen, we'll need to override some of the common UWS functions. Note: For async operation (without a custom UWS), you can only have uploads in the upload parameter. """ joblistPreamble = ("<?xml-stylesheet href='/static" "/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>") jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" "useruws-job-to-html.xsl' type='text/xsl'?>") def __init__(self, service, jobActions, parameterGrammar): self.runcountGoal = base.getConfig("async", "maxUserUWSRunningDefault") self.service = weakref.proxy(service) uws.UWSWithQueueing.__init__(self, makeUserUWSJobClass(service), jobActions) self.parameterGrammar = parameterGrammar
[docs] def getURLForId(self, jobId): return self.service.getURL("uws.xml")+"/"+jobId
def _getJob(self, jobId, conn, writable=False): """returns the named job as uws.UWS._getJob. However, in a user UWS, there can be jobs from multiple services. It would be nonsense to load another UWS's job's parameters into our job class. To prevent this, we redirect if we find the new job's class isn't ours. On the web interface, that should do the trick. Everywhere else, this may not be entirely clear but still prevent major confusion. This is repeating code from uws.UWS._getJob; some refactoring at some point would be nice. """ statementId = 'getById' if writable: statementId = 'getByIdEx' res = self.runCanned(statementId, {"jobId": jobId}, conn) if len(res)!=1: raise uws.JobNotFound(jobId) if res[0]["jobClass"]!=self.service.getFullId(): raise svcs.WebRedirect( base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId)) return self.jobClass(res[0], self, writable)
[docs] def getIdsAndPhases(self, *args, **kwargs): # for user UWSes, we only want jobs from our service in the job # list resource. We insert extra conditions to the basic queries. # getById and getAllIds don't change, though, as they're used internally # and could influence, e.g., queueing and such. return uws.UWSWithQueueing.getIdsAndPhases(self, *args, initFragments=["jobClass=%(jobClass)s"], initPars={"jobClass": self.service.getFullId()}, **kwargs)
[docs]def makeUWSForService(service): """returns a UserUWS instance tailored to service. All these share a jobs table, but the all have different job classes with the parameters custom-made for the service's core. A drawback of this is that each UWS created in this way runs the job table purger again. That shouldn't be a problem per se but may become cumbersome at some point. We can always introduce a class Attribute on UserUWS to keep additional UWSes from starting cron jobs of their own. """ return UserUWS( service, uwsactions.JobActions(), base.makeStruct(svcs.ContextGrammar, inputTD=service.core.inputTable))
####################### CLI
[docs]def parseCommandLine(): import argparse parser = argparse.ArgumentParser(description="Run an asynchronous" " generic job (used internally)") parser.add_argument("jobId", type=str, help="UWS id of the job to run") return parser.parse_args()
[docs]def main(): args = parseCommandLine() jobId = args.jobId with base.getTableConn() as conn: svcId = list( conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s", {"jobId": jobId}))[0][0] service = base.resolveCrossId(svcId) # we're the only ones running, so we're safe using this. queryMeta = svcs.emptyQueryMeta try: job = service.getUWS().getJob(jobId) with job.getWritable() as wjob: wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow()) service = base.resolveCrossId(job.jobClass) inputTable = svcs.CoreArgs(service.core.inputTable, job.parameters, job.parameters) inputTable.job = job data = service._runWithInputTable( service.core, inputTable, queryMeta) # Our cores either return a table, a pair of mime and data, # or None (in which case they added the results themselves) if isinstance(data, tuple): mime, payload = data with job.openResult(mime, "result") as destF: destF.write(payload) elif isinstance(data, rsc.Data): destFmt = inputTable.getParam("responseformat" ) or "application/x-votable+xml" with job.openResult(destFmt, "result") as destF: formats.formatData(destFmt, data, destF, False) elif data is None: pass else: raise NotImplementedError("Cannot handle a service %s result yet."% repr(data)) with job.getWritable() as wjob: wjob.change(phase=uws.COMPLETED) except SystemExit: pass except uws.JobNotFound: base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId) except Exception as ex: base.ui.notifyError("UWS runner %s major failure"%jobId) # try to push job into the error state -- this may well fail given # that we're quite hosed, but it's worth the try service.getUWS().changeToPhase(jobId, uws.ERROR, ex) raise