Source code for gavo.protocols.uws

"""
Support classes for the universal worker service.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import functools
import io
import os
import pickle
import shutil
import signal
import tempfile
import threading
import weakref

from twisted.internet import protocol
from twisted.internet import reactor

from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils
from gavo.base import cron
from gavo.protocols import dali

# Ward against typos
from gavo.votable.tapquery import ( #noflake: exported names
	PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, UNKNOWN)

END_STATES = set([COMPLETED, ERROR, ABORTED])

# used in the computation of quote
EST_TIME_PER_JOB = datetime.timedelta(minutes=10)

_DEFAULT_LOCK_TIMEOUT = 0.1


[docs]class UWSError(base.Error): """UWS-related errors, mainly to communicate with web renderers. UWSErrors are constructed with a displayable message (may be None to autogenerate one), a jobId (give one; the default None is only there to avoid breaking legacy code) and optionally a source exception and a hint. """ def __init__(self, msg, jobId=None, sourceEx=None, hint=None): base.Error.__init__(self, msg, hint=hint) self.msg = msg self.jobId = jobId self.sourceEx = sourceEx self.args = [self.msg, self.jobId, self.sourceEx, self.hint] def __str__(self): if self.msg: return self.msg elif self.sourceEx: return "UWS on job %s operation failed (%s, %s)"%( self.jobId, self.sourceEx.__class__.__name__, str(self.sourceEx)) else: return "Unspecified UWS related error (id: %s)"%self.jobId
[docs]class JobNotFound(base.NotFoundError, UWSError): def __init__(self, jobId): base.NotFoundError.__init__(self, str(jobId), "UWS job", "jobs table") def __str__(self): return base.NotFoundError.__str__(self)
[docs]class UWS(object): """a facade for a universal worker service (UWS). You must construct it with the job class (see UWSJob) and a uwsactions.JobActions instance The UWS then provides methods to access the jobs table, create jobs and and deserialize jobs from the jobs table. It also has a context grammar that parses the JCL parameters from request.strargs. In this base class, that parses no parameters at all, and you will almost certainly have to override this. We have a "statements cache" in here, where we used the UWS table definition to create query strings we can later pass to the database. Don't worry about this any more. Just write text queries when adding features. It's more readable and just about as stable against code evolution. You must override the getURLForId(jobId) method in your concrete implementation. You should also override jobdocPreamble and joblistPreamble. This is raw XML that is prepended to job and list documents. This is primarily for PIs giving stylesheets, but given we don't use doctypes you could provide internal subsets there, too. Anyway, see the TAP UWS runner for examples. """ # how often should we check for jobs that wait for destruction? cleanupInterval = 3600*12 # raw XML to prepend to joblist documents joblistPreamble = "" # raw XML to prepend to job documents jobdocPreamble = "" # the grammar to parse incoming parameters. This must be idempotent, # because before starting the core, the parameters already parsed # will be filtered through the grammar. That's necessary to catch # incomplete parameter sets (which we can't do during individual # posts). parameterGrammar = base.makeStruct(svcs.ContextGrammar) def __init__(self, jobClass, jobActions): self.jobClass = jobClass self.jobActions = jobActions self._statementsCache = None cron.runEvery(-self.cleanupInterval, "UWS %s jobs table reaper"%str(self), self.cleanupJobsTable) def _makeMoreStatements(self, statements, jobsTable): """adds custom statements to the canned query dict in derived classes. """ pass def _makeStatementsCache(self): """returns a dictionary containing canned queries to manipulate the jobs table for this UWS. """ res = {} td = self.jobClass.jobsTD with base.getTableConn() as conn: jobsTable = rsc.TableForDef(td, connection=conn, exclusive=True) res["getByIdEx"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0}) res["feedToIdEx"] = None, jobsTable.addCommand, None res["deleteByIdEx"] = None, jobsTable.getDeleteQuery( "jobId=%(jobId)s")[0], None jobsTable = rsc.TableForDef(td, connection=conn) res["getById"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0}) res["getAllIds"] = jobsTable.getQuery( [td.getColumnByName("jobId")], "") countField = base.makeStruct( svcs.OutputField, name="count", type="integer", select="count(*)", required=True) res["countRunning"] = jobsTable.getQuery([countField], "phase='EXECUTING'") res["countQueued"] = jobsTable.getQuery([countField], "phase='QUEUED'") self._makeMoreStatements(res, jobsTable) return res @property def _statements(self): """returns a dictionary of canned statements manipulating the jobs table. """ if self._statementsCache is None: self._statementsCache = self._makeStatementsCache() return self._statementsCache
[docs] def runCanned(self, statementId, args, conn): """runs the canned statement statementId with args through the DB connection conn. This will return row dictionaries of the result if there is a result. """ resultTableDef, statement, _ = self._statements[statementId] cursor = conn.cursor() try: cursor.execute(statement, args) except base.QueryCanceledError: conn.rollback() raise base.ReportableError("Could not access the jobs table." " This probably means there is a stale lock on it. Please" " notify the service operators.") res = None if resultTableDef: res = [resultTableDef.makeRowFromTuple(row) for row in cursor] cursor.close() return res
def _serializeProps(self, props, writableConn): """inserts (or overwrites) the job properties props through wriableConn. """ self.runCanned("feedToIdEx", props, writableConn)
[docs] def getNewJobId(self, **kws): """creates a new job and returns its id. kws can be properties of the new job or the special key timeout giving after how many seconds we should give up trying to lock the db. """ timeout = kws.pop("timeout", _DEFAULT_LOCK_TIMEOUT) try: with base.getWritableTableConn() as conn: # We fire off a quick pointless query to catch server restarts; # if this fails, the connection pools are cleared and the next # queries will run again. conn.execute("SELECT table_name FROM TAP_SCHEMA.tables LIMIT 1") except base.DBError: pass with base.getWritableTableConn() as conn: with conn.parameters([ ("statement_timeout", "%s ms"%int(timeout*1000))]): props = self.jobClass.getDefaults(conn) props["jobId"] = self.jobClass.getNewId(self, conn) job = self.jobClass(props, self, writable=True) job.change(**kws) self._serializeProps(job.getProperties(), conn) return job.jobId
[docs] def getNewIdFromArgs(self, uwsArgs, paramDict, user=None): """returns a new UWS id for a job processing paramDict. paramDict need to be parsed already (typically, it's a CoreArgs.getParamDict() result) using the workerSystem's parameterGrammar. Pass user to restrict the job to that user. No authentication is done here, so make sure you actually authenticate user (cf. getNewIdFromRequest for how this could be done). """ jobId = self.getNewJobId() try: with self.changeableJob(jobId) as wjob: wjob.setParamsFromDict(paramDict) if user is not None: wjob.change(owner=user) except: # something went wrong while setting up the request; do away with it # again self.destroy(jobId) raise return jobId
[docs] def getNewIdFromRequest(self, request): """returns the id of a new TAP job created from request. Request has to be a t.w request or similar, but preprocessed by prepareRequest below to have the parsed UWS parameters in uwsArgs. """ return self.getNewIdFromArgs( request.uwsArgs, self.parameterGrammar.parseStrargs( request.strargs, ensureRequired=False), request.getUser() if request.getAuthUser() else None)
def _getJob(self, jobId, conn, writable=False): """helps getJob and getNewJob. """ # Caution: this code is copied in useruws.UserUWS._getJob # If you find you need to change this, this may be a good # time to figure out how to refactor this method. statementId = 'getById' if writable: statementId = 'getByIdEx' res = self.runCanned(statementId, {"jobId": jobId}, conn) if len(res)!=1: raise JobNotFound(jobId) return self.jobClass(res[0], self, writable)
[docs] def getJob(self, jobId): """returns a read-only UWSJob for jobId. Note that by the time you do something with the information here, the "true" state in the database may already be different. There should be no way to write whatever information you have in here, so any "racing" here shouldn't hurt. """ with base.getTableConn() as conn: return self._getJob(jobId, conn)
[docs] def getNewJob(self, **kws): """creates a new job and returns a read-only instance for it. """ newId = self.getNewJobId(**kws) return self.getJob(newId)
[docs] @contextlib.contextmanager def changeableJob(self, jobId, timeout=_DEFAULT_LOCK_TIMEOUT): """a context manager for job manipulation. This is done such that any changes to the job's properties within the controlled section get propagated to the database. As long as you are in the controlled section, nobody else can change the job. """ with base.getWritableTableConn() as conn: with conn.parameters([ ("statement_timeout", "%s ms"%int(timeout*1000))]): job = self._getJob(jobId, conn, writable=True) try: yield job except: conn.rollback() raise else: self._serializeProps(job.getProperties(), conn) conn.commit()
[docs] def changeToPhase(self, jobId, newPhase, input=None, timeout=_DEFAULT_LOCK_TIMEOUT): with self.changeableJob(jobId, timeout=timeout) as wjob: try: transition = wjob.getTransitionTo(newPhase) return transition(newPhase, wjob, input) except Exception as exception: # transition to error if possible. If that fails at well, # blindly set error and give up. try: if newPhase!=ERROR: return wjob.getTransitionTo(ERROR)(ERROR, wjob, exception) except: wjob.change(phase=ERROR, error=exception) raise
[docs] def destroy(self, jobId): """removes the job with jobId from the UWS. This calls the job's prepareForDestruction method while the job is writable. """ try: try: with self.changeableJob(jobId) as job: job.prepareForDestruction() except Exception as exc: base.ui.notifyWarning( "Ignored error while destroying UWS job %s: %s"%(jobId, exc)) finally: with base.getWritableTableConn() as conn: self.runCanned("deleteByIdEx", locals(), conn)
def _countUsingCanned(self, statementId): """helps the count* methods. """ with base.getTableConn() as conn: return self.runCanned(statementId, {}, conn)[0]["count"]
[docs] def countRunningJobs(self): """returns the number of EXECUTING jobs in the jobsTable. """ return self._countUsingCanned('countRunning')
[docs] def countQueuedJobs(self): """returns the number of QUEUED jobs in jobsTable. """ return self._countUsingCanned('countQueued')
[docs] def getJobIds(self): """returns a list of all currently existing job ids. """ with base.getTableConn() as conn: return [r["jobId"] for r in self.runCanned('getAllIds', {}, conn)]
[docs] def getIdsAndPhases(self, owner=None, phase=None, last=None, after=None, initFragments=None, initPars=None): """returns pairs for id and phase for all jobs in the UWS. phase, last, after are the respective parameters from UWS 1.1. """ pars = locals() fragments = initFragments or [] pars.update(initPars or {}) limits = None if owner is not None: fragments.append("owner=%(owner)s") if phase is not None: fragments.append("phase=%(phase)s") if last is not None: limits = "ORDER BY creationTime DESC LIMIT %(limit)s" pars['limit'] = last if after is not None: fragments.append("creationTime>%(after)s") td = self.jobClass.jobsTD with base.getTableConn() as conn: return conn.query(td.getSimpleQuery(["jobId", "phase"], fragments=base.joinOperatorExpr("AND", fragments), postfix=limits), pars)
[docs] def cleanupJobsTable(self, includeFailed=False, includeCompleted=False, includeAll=False, includeForgotten=False): """removes expired jobs from the UWS jobs table. The constructor arranged for this to be called now and then (cleanupFrequency class attribute, defaulting to 12*3600). The functionality is also exposed through gavo admin cleanuws; this also lets you use the includeFailed and includeCompleted flags. These should not be used on production services since you'd probably nuke jobs still interesting to your users. """ with base.AdhocQuerier() as q: if not q.getTableType(self.jobClass.jobsTD.getQName()): # no jobs table, nothing to clean up return phasesToKill = set() if includeFailed or includeAll: phasesToKill.add(ERROR) phasesToKill.add(ABORTED) if includeCompleted or includeAll: phasesToKill.add(COMPLETED) if includeAll: phasesToKill.add(PENDING) phasesToKill.add(QUEUED) if includeForgotten: phasesToKill.add(PENDING) fragments = "destructionTime<%(now)s" if phasesToKill: fragments = "destructionTime<%(now)s or phase in %(ptk)s" for row in self.jobClass.jobsTD.doSimpleQuery( ['jobId'], fragments, {"now": datetime.datetime.utcnow(), "ptk": phasesToKill}): jobId = row["jobId"] try: self.destroy(jobId) except base.QueryCanceledError: # job locked by something, don't hang base.ui.notifyWarning("Postponing destruction of %s: Locked"% jobId) except JobNotFound: # Someone else has cleaned up -- that's ok pass
[docs] def getURLForId(self, jobId): # pragma: no cover """returns the handling URL for the job with jobId. You must override this in deriving classes. """ raise NotImplementedError("Incomplete UWS (getURLForId not overridden).")
[docs]class UWSWithQueueing(UWS): """A UWS with support for queueing. Queuing is done on UWS level rather than at transitions. With a plain UWS, if something is put on the queue, it must be started by the Transition's queueJob method. With UWSWithQueuing, you just mark the job queued and the rest is taken care of by the UWS itself. """ # _processQueueDirty is managed through scheduleProcessQueueCheck _processQueueDirty = False # How many jobs will the UWS (try to) run at the same time? runcountGoal = 1 # how often should we do an extra pass over the queue to # see if we can run something? (under normal circumstances, # the queue is checked when a job goes out of executing # more or less immediately; this is just a last-resort thing # -- and it seeds the queue runner after a restart with UWS jobs. queueCheckInterval = 60 def __init__(self, jobClass, actions): # processQueue shouldn't strictly need a lock. The lock mainly # protects against running more unqueuers than necessary self._processQueueLock = threading.Lock() UWS.__init__(self, jobClass, actions) cron.runEvery(-self.queueCheckInterval, "UWS %s timed queue processor"%str(self), self._processQueue) def _makeMoreStatements(self, statements, jobsTable): UWS._makeMoreStatements(self, statements, jobsTable) td = jobsTable.tableDef countField = base.makeStruct( svcs.OutputField, name="count", type="integer", select="count(*)", required=True) statements["countQueuedBefore"] = jobsTable.getQuery( [countField], "phase='QUEUED' and destructionTime<=%(dt)s", {"dt": None}) statements["getIdsScheduledNext"] = jobsTable.getQuery( [jobsTable.tableDef.getColumnByName("jobId")], "phase='QUEUED'", limits=('ORDER BY destructionTime ASC', {})) statements["getHungCandidates"] = jobsTable.getQuery([ td.getColumnByName("jobId"), td.getColumnByName("pid")], "phase='EXECUTING'")
[docs] def scheduleProcessQueueCheck(self): """tells TAP UWS to try and dequeue jobs next time checkProcessQueue is called. This function exists since during the TAPTransistions there's a writable job and processing the queue might deadlock. So, rather than processing right away, we just note something may need to be done. """ self._processQueueDirty = True
[docs] def checkProcessQueue(self): """sees if any QUEUED process can be made EXECUTING. This must be called while you're not holding any changeableJob. """ if self._processQueueDirty: self._processQueueDirty = False self._processQueue()
def _processQueue(self): """tries to take jobs from the queue. This function is called from checkProcessQueue when we think we are transitioning from EXECUTING to something else. Currently, the jobs with the earliest destructionTime are processed first. That's, of course, completely ad-hoc. """ if not self._processQueueLock.acquire(False): # There's already an unqueuer running, don't need a second one # Note that other processes (e.g., taprunner) might still be manipulating # the jobs table, so don't rely on the tables not changing here. return else: try: if self.countQueuedJobs()==0: return try: started = 0 with base.getTableConn() as conn: toStart = [row["jobId"] for row in self.runCanned('getIdsScheduledNext', {}, conn)] while toStart: if self.countRunningJobs()>=self.runcountGoal: break self.changeToPhase(toStart.pop(0), EXECUTING) started += 1 if started==0: # No jobs could be started. This may be fine when long-runnning # jobs block job submission, but for catastrophic slave # failures we want to make sure all jobs we think are executing # actually are. If they've silently died, we log that and # push them to error. # We only want to do that if we're the server; and it shouldn't # really happen in normal operation, so we only run this # if zombies might block proper jobs. if base.IS_DACHS_SERVER: self._ensureJobsAreRunning() except Exception: base.ui.notifyError("Error during queue processing, " " the UWS %s is probably botched now."%self.__class__.__name__) finally: self._processQueueLock.release() def _ensureJobsAreRunning(self): """pushes all executing slave jobs that silently died to ERROR. """ with base.getTableConn() as conn: for row in self.runCanned("getHungCandidates", {}, conn): jobId, pid = row["jobId"], row["pid"] if pid is None: self.changeToPhase(jobId, "ERROR", UWSError("EXECUTING job %s had no pid."%jobId, jobId)) base.ui.notifyError("Stillborn async slave %s"%jobId) elif pid<2: # these are jobs run within the server -- we can't do # anything about them using process manipulation (if at all, # they may be realised as threads). Skip them here. continue else: # Guard against jobs not cleaning up behind themselves; # If they're kill -9-ed, segfault or anything else, they # won't fix their phase. We also can't be sure they are # our children, as the server may have been restarted. # Simple, somewhat ad-hoc solution: let's see if they're # visible in /proc and push them to error otherwise. # There are various ways in which this could go wrong # (pid reuse, races), but it's still a lot better # than nothing. if not os.path.exists(f"/proc/{pid}"): # the following doesn't hurt if the job has gone to COMPLETED # in the meantime -- we don't transition *from* COMPLETED. self.changeToPhase(jobId, "ERROR", UWSError("EXECUTING job %s has silently died."%jobId, jobId)) base.ui.notifyError("Zombie UWS runner: %s"%jobId)
[docs] def changeToPhase(self, jobId, newPhase, input=None, timeout=10): """overridden here to hook in queue management. """ UWS.changeToPhase(self, jobId, newPhase, input, timeout) self.checkProcessQueue()
[docs]class ParameterRef(object): """A UWS parameter that is (in effect) a URL. This always contains a URL. In case of uploads, the tap renderer makes sure the upload is placed into the upload directory and generates a URL; in that case, local is True. You need this class when you want the byReference attribute in the UWS.parameter element to be true. """ def __init__(self, url, local=False): self.local = local self.url = url
[docs]class UWSJobType(type): """The metaclass for UWS jobs. We have the metaclass primarily because we want to delay loading the actual definition until it is actually needed (otherwise we might get interesting chicken-egg-problems with rscdesc at some point). A welcome side effect is that we can do custom constructors and similar cosmetic deviltry. """ @property def jobsTD(cls): try: return cls._jobsTDCache except AttributeError: cls._jobsTDCache = base.resolveCrossId(cls._jobsTDId, rscdef.TableDef) return cls._jobsTDCache
[docs]class BaseUWSJob(object, metaclass=UWSJobType): """An abstract UWS job. UWS jobs are always instantiiated with a row from the associated jobs table (i.e. a dictionary giving all the uws properties). You can read the properties as attributes. UWSJobs also keep a (weak) reference to the UWS that made them. Make sure you don't confuse the UWS properties with the JCL parameters (which are what the core receives). the latter are in the parameters property. To alter uws properties, use the change method. This will fail unless the job was created giving writable=True. To make it concrete, you need to define: - a _jobsTDid attribute giving the (cross) id of the UWS jobs table for this kind of job - a _transitions attribute giving a UWSTransitions instance that defines what to do on transitions - as needed, class methods _default_<parName> if you need to default job parameters in newly created jobs - as needed, methods _decode_<parName> and _encode_<parName> to bring uws parameters (i.e., everything that has a DB column) from and to the DB representation from *python* values. You may want to override: - a class method getNewId(uws, writableConn) -> str, a method allocating a unique id for a new job and returning it. Beware of races here; if your allocation is through the database table, you'll have to lock it *and* write a preliminary record with your new id. The default implementation does this, but if you want something in the file system, you probably don't want to use that. - a method prepareForDestruction() to do extra cleanup before a job is destroyed. The job parameters -- those eventually passed to the core when the job runs -- are held in a dictionary parameters. You should in general not do any parsing of them at the UWS level, as that's really the job of the service/core, as in the sync case. However, certain job parameters need handling at the async level, in particular UPLOAD. In that case, define _preprocess_<parName> methods receiving the value as parsed by the grammar and have to return something that can be serialised to the database. Note that when you assign to job.parameters yourself, these changes will be ignored. Always use setPar to modify job.parameters. Reading job.parameters is fine. If you need to clean up before the job is torn down, redefine the prepareForDestruction method. """ def __init__(self, props, uws, writable=False): object.__setattr__(self, "_props", props) self.writable = writable self.uws = weakref.proxy(uws) def __getattr__(self, name): if name in self._props: return getattr(self, "_decode_"+name, utils.identity)( self._props[name]) raise AttributeError("%s objects have no attribute '%s'"%( self.__class__.__name__, name)) def __setattr__(self, name, value): # ward against tempting bugs, disallow assigning to names in _props: if name in self._props: raise TypeError("Use the change method to change the %s" " attribute."%name) object.__setattr__(self, name, value) @property def quote(self): """Always returns None. Override if you have a queue management. """ return None
[docs] @classmethod def getNewId(cls, uws, conn): cursor = conn.cursor() tableName = cls.jobsTD.getQName() cursor.execute("LOCK TABLE %s IN ACCESS SHARE MODE"%tableName) try: while True: newId = utils.getRandomString(10) cursor.execute("SELECT * FROM %s WHERE jobId=%%(jobId)s"%tableName, {"jobId": newId}) if not list(cursor): cursor.execute( "INSERT INTO %s (jobId) VALUES (%%(jobId)s)"%tableName, {"jobId": newId}) break cursor.close() conn.commit() except: conn.rollback() raise return newId
[docs] @classmethod def getDefaults(cls, conn): """returns a dictionary suitable for inserting into a jobsTD table. """ res = {} for column in cls.jobsTD: name = column.name res[name] = getattr(cls, "_default_"+name, lambda: None)() return res
@classmethod def _default_phase(cls): return PENDING @classmethod def _default_executionDuration(cls): return base.getConfig("async", "defaultExecTime") @classmethod def _default_creationTime(cls): return datetime.datetime.utcnow() @classmethod def _default_destructionTime(cls): return datetime.datetime.utcnow()+datetime.timedelta( seconds=base.getConfig("async", "defaultLifetime")) def _encode_error(self, value): """returns a pickled dictionary with error information. value can either be an exception object or a dictionary as generated here. """ if value is None: return None if not isinstance(value, dict): value = { "type": value.__class__.__name__, "msg": str(value), "hint": getattr(value, "hint", None), } return utils.getCleanBytes(pickle.dumps(value)).decode("ascii") def _decode_error(self, value): """returns the unpickled three-item dictionary from the database string. """ if value is None: return None return pickle.loads(utils.getDirtyBytes(value)) @classmethod def _default_parameters(cls): return utils.getCleanBytes(pickle.dumps({}, protocol=2)).decode("ascii") def _encode_parameters(self, value): """(db format for parameters is a pickle) """ return utils.getCleanBytes(pickle.dumps(value, protocol=2)).decode("ascii") def _decode_parameters(self, value): """(db format for parameters is a pickle) """ return pickle.loads(utils.getDirtyBytes(value))
[docs] def setPar(self, parName, parValue): """enters parName:parValue into self.parameters. """ parName = parName.lower() if parValue is None and self.parameters.get(parName): # don't nuke a parameter that already exists. return if hasattr(self, "_preprocess_"+parName): parValue = getattr(self, "_preprocess_"+parName)(parValue) newPars = self.parameters.copy() # We'll get DALI uploads as 2-tuples with a BytesIO in the # second item. Save that before serialisation. # Perhaps we should re-use this for TAP (where the analogous # thing happens in _preprocess_upload). if isinstance(parValue, tuple): try: if isinstance(parValue[1], io.BytesIO): parValue = (parName, str(self._saveUpload(parName, parValue[1]))) except IndexError: # parValue is too short, hence it's no upload pass # we had better check the multiplicity of the inputKey to decide # between extending and overwriting; but for now that's about # equivalent. if isinstance(newPars.get(parName), list): newPars[parName].extend(parValue) elif isinstance(newPars.get(parName), dict): # that's a temporary hack for dlasync until we get to properly # review how to really do that kind of thing newPars[parName].update(parName) else: newPars[parName] = parValue self.change(parameters=newPars)
[docs] def iterSerializedPars(self): """iterates over the serialized versions of the parameters. This is, really for uwsactions.getParametersElement, and uses a secret handshake to let this create ByReference links. """ for key, value in self.parameters.items(): if (isinstance(value, list) and len(value)>0 and isinstance(value[0], tuple) and len(value[0])==2 and value[0][1].startswith("file://")): # this quite certainly is one of our uploaded files for upload in value: yield key, ParameterRef( LocalFile(self, self.getWD(), upload[1].split("/")[-1] ).getURL()) elif (isinstance(value, tuple) and len(value)==2 and value[1].startswith("file://")): # same thing for DALI uploads yield key, ParameterRef( LocalFile(self, self.getWD(), value[1].split("/")[-1] ).getURL()) else: yield key, "" if value is None else str(value)
[docs] def setParamsFromDict(self, argDict): """sets our parameters from a dictionary of parsed parameters. self must be writeable for this to work. argDict should in general be the result of a contextgrammar, but, really, any dict will do. We force-lowercase everything here. If that's a problem for your protocol, you deserve no mercy. """ for key, value in argDict.items(): self.setPar(key.lower(), value)
[docs] def setParamsFromRawDict(self, strargs): """sets our parameters from a dictionary of string lists (i.e., requests.strargs). The job must be writeable when you call this. The arguments don't have to be complete by the parameterGrammar; this also means that no defaults are inserted. Use job.completeParams() for that (it's typically called when queuing a job). """ self.setParamsFromDict( self.uws.parameterGrammar.parseStrargs(strargs, ensureRequired=False))
[docs] def completeParams(self): """completes self's parameters from the worker's context grammar. Job must be writable for this to work. This is necessary because in UWS operations, actions for missing parameters are never executed; missing parameters could still be provided later. Worker systems hence must arrange for this to be called when queueing a job; SimpleUWSTransitions.queueJob already does that. This may raise an exception if parameters are missing. """ pars = self.parameters for ik in self.uws.parameterGrammar.iterInputKeys(): key = ik.name.lower() val = pars.get(key) if val is None or val==[]: pars[key] = ik.computeCoreArgValue([]) self.change(parameters=pars)
[docs] def getTransitionTo(self, newPhase): """returns the action prescribed to push self to newPhase. A ValidationError is raised if no such transition is defined. """ return self._transitions.getTransition(self.phase, newPhase)
[docs] def change(self, **kwargs): """changes the property values to what's given by the keyword arguments. It is an AttributeError to try and change a property that is not defined. """ if not self.writable: raise TypeError("Read-only UWS job (You can only change UWSJobs" "obtained through changeableJob.") for propName, propValue in kwargs.items(): if propName not in self._props: raise AttributeError("%ss have no attribute %s"%( self.__class__.__name__, propName)) self._props[propName] = getattr(self, "_encode_"+propName, utils.identity)(propValue)
[docs] def getProperties(self): """returns the properties of the job as they are stored in the database. Use attribute access to read them and change to change them. Do *not* get values from the dictionary you get and do *not* change the dictionary. """ return self._props
[docs] def update(self): """fetches a new copy of the job props from the DB. You should in general not need this, since UWSJob objects are intended to be short-lived ("for the duration of an async request"). Still, for testing and similar, it's convenient to be able to update a UWS job from the database. """ self._props = self.uws.getJob(self.jobId)._props return self
[docs] def prepareForDestruction(self): """is called before the job's database row is torn down. Self is writable at this point. """
[docs] def getURL(self): """returns the UWS URL for this job. """ return self.uws.getURLForId(self.jobId)
[docs] @contextlib.contextmanager def getWritable(self): """a context manager for a writeable version of the job. Changes will be written back at the end, and the job object itself will be updated from the database. If self already is writable, it is returned unchanged, and changes are only persisted when the enclosing controlling block finishes. """ if self.writable: yield self return with self.uws.changeableJob(self.jobId) as wjob: yield wjob self.update()
[docs]class LocalFile(object): """A sentinel class representing a file within a job work directory (as resulting from an upload). """ def __init__(self, jobId, wd, fileName): self.job, self.fileName = weakref.proxy(jobId), fileName self.fullPath = os.path.join(wd, fileName) def __str__(self): # This is mainly for serialisation of the upload parameter in the # UWS job. return f"file://{self.fileName}"
[docs] def getURL(self): """returns the URL the file is retrievable under for the life time of the job. """ return self.job.getURL()+f"/results/{self.fileName}"
[docs]class UWSJobWithWD(BaseUWSJob): """A UWS job with a working directory. This generates ids from directory names in a directory (the uwsWD) shared for all UWSes on the system. It also adds methods - getWD() -> str returning the working directory - addResult(self, source, mimeType, name=None) to add a new result - openResult(self, mimeType, name) -> file to get an open file in the WD to write to in order to generate a result - getResult(self, resName) -> str to get the *path* of a result with resName - getResults(self) -> list-of-dicts to get dicts describing all results available - openFile(self) -> file to get a file letting you read an existing result. """
[docs] @classmethod def getNewId(self, uws, conn): # our id is the base name of the jobs's temporary directory. uwsWD = base.getConfig("uwsWD") utils.ensureDir(uwsWD, mode=0o775, setGroupTo=base.getGroupId()) jobDir = tempfile.mkdtemp("", "", dir=uwsWD) return os.path.basename(jobDir)
[docs] def getWD(self): return os.path.join(base.getConfig("uwsWD"), self.jobId)
[docs] def prepareForDestruction(self): shutil.rmtree(self.getWD())
# results management: We use a pickled list in the jobs dir to manage # the results. I once had a table of those in the DB and it just # wasn't worth it. One issue, though: this potentially races # if two different processes/threads were to update the results # at the same time. This could be worked around by writing # the results pickle only from within changeableJobs. # # The list contains dictionaries having resultName and resultType keys. @property def _resultsDirName(self): return os.path.join(self.getWD(), "__RESULTS__") def _loadResults(self): try: with open(self._resultsDirName, "rb") as f: return pickle.load(f) except IOError: return [] def _saveResults(self, results): handle, srcName = tempfile.mkstemp(dir=self.getWD()) with os.fdopen(handle, "wb") as f: pickle.dump(results, f) # The following operation will bomb on windows when the second # result is saved. Tough luck. os.rename(srcName, self._resultsDirName) def _addResultInJobDir(self, mimeType, name): resTable = self._loadResults() newRec = {'resultName': name, 'resultType': mimeType} for index, res in enumerate(resTable): if res["resultName"]==name: resTable[index] = newRec break else: resTable.append( {'resultName': name, 'resultType': mimeType}) self._saveResults(resTable) def _preprocess_upload(self, uploadParam): """picks non-saved inline uploads from uploadParam and saves them. This is done so we can serialise the uploads into the database; we wouldn't want to do that with the uploads. Note that this is called by setPar("upload") by virtue of its magic name. """ if not uploadParam: return None processed = [] for destTable, target in uploadParam: if isinstance(target, tuple): target = self._saveUpload(destTable, target[1]) processed.append((destTable, str(target))) return processed def _saveUpload(self, localName, uploadFile): """helps preprocess_upload. """ with self.openFile(localName, "wb") as f: f.write(uploadFile.read()) return LocalFile(self, self.getWD(), localName)
[docs] def fixTypeForResultName(self, resultName, mediaType): """sets the media type for result resultName. It is not an error if no result with resultName exists. """ resTable = self._loadResults() for row in resTable: if row["resultName"]==resultName: row["resultType"] = mediaType self._saveResults(resTable)
[docs] def addResult(self, source, mimeType, name=None): """adds a result, with data taken from source. source may be a file-like object or a string, or bytes. If no name is passed, a name is auto-generated. """ if name is None: name = utils.intToFunnyName(id(source)) with open(os.path.join(self.getWD(), name), "wb") as destF: if isinstance(source, str): source = utils.bytify(source) if isinstance(source, bytes): destF.write(source) else: utils.cat(source, destF) self._addResultInJobDir(mimeType, name)
[docs] def openResult(self, mimeType, name): """returns a writable file that adds a result. """ self._addResultInJobDir(mimeType, name) return open(os.path.join(self.getWD(), name), "wb")
[docs] def getResult(self, resName): """returns a pair of file name and mime type for a named job result. If the result does not exist, a NotFoundError is raised. """ res = [r for r in self._loadResults() if resName==r["resultName"]] if not res: raise base.NotFoundError(resName, "job result", "uws job %s"%self.jobId) res = res[0] return os.path.join(self.getWD(), res["resultName"]), res["resultType"]
[docs] def getResults(self): """returns a list of this service's results. The list contains dictionaries having at least resultName and resultType keys. """ return self._loadResults()
[docs] def openFile(self, name, mode="rb"): """returns an open file object for a file within the job's work directory. This will only use the last path component, everything else is discarded. """ name = name.split("/")[-1] return open(os.path.join(self.getWD(), name), mode)
[docs]class UWSTransitions(object): """An abstract base for classes defining the behaviour of a UWS. This basically is the definition of a finite state machine with arbitrary input (which is to say: the input "alphabet" is up to the transitions). A UWSTransitions instance is in the transitions attribute of a job class. The main interface to UWSTransitions is getTransition(p1, p2) -> callable It returns a callable that should push the automaton from phase p1 to phase p2 or raise an ValidationError for a field phase. The callable has the signature f(desiredPhase, wjob, input) -> None. It must alter the uwsJob object as appropriate. input is some object defined by the the transition. The job passed is a changeable job, so the handlers actually hold locks to the job row. Thus, be brief. The transitions are implemented as simple methods having the signature of the callables returned by getTransition. To link transitions and methods, pass a vertices list to the constructor. This list consists of 3-tuples of strings (from, to, method-name). From and to are phase names (use the symbols from this module to ward against typos). """ def __init__(self, name, vertices): self.name = name self._buildTransitions(vertices) def _buildTransitions(self, vertices): self.transitions = {} # set some defaults for phase in [PENDING, QUEUED, EXECUTING, ERROR, ABORTED, COMPLETED]: self.transitions.setdefault(phase, {})[ERROR] = "flagError" self.transitions.setdefault(EXECUTING, {})[COMPLETED ] = "noteEndTime" for fromPhase, toPhase, methodName in vertices: self.transitions.setdefault(fromPhase, {})[toPhase] = methodName
[docs] def getTransition(self, fromPhase, toPhase): if (fromPhase==toPhase or fromPhase in END_STATES): # ignore null or ignorable transitions return lambda p, job, input: None try: methodName = self.transitions[fromPhase][toPhase] except KeyError: raise base.ui.logOldExc( base.ValidationError("No transition from %s to %s defined" " for %s jobs"%(fromPhase, toPhase, self.name), "phase", hint="This almost always points to an implementation error")) try: return getattr(self, methodName) except AttributeError: raise base.ui.logOldExc( base.ValidationError("%s Transitions have no %s methods"%(self.name, methodName), "phase", hint="This is an error in an internal protocol definition." " There probably is nothing you can do but complain."))
[docs] def noOp(self, newPhase, job, ignored): """a sample action just setting the new phase. This is a no-op baseline sometimes useful in user code. """ job.change(phase=newPhase)
[docs] def flagError(self, newPhase, wjob, exception): """the default action when transitioning to an error: dump exception and mark phase as ERROR.. """ wjob.change(phase=ERROR) # Validation errors don't get logged -- for one, they probably # are the user's fault, and for a second, logging them upsets # trial during testing, since trial examines the log. if not isinstance(exception, base.ValidationError): base.ui.notifyError("Error during UWS execution of job %s"%wjob.jobId) wjob.change(error=exception) if wjob.endTime is None: wjob.change(endTime=datetime.datetime.utcnow())
[docs] def noteEndTime(self, newPhase, wjob, ignored): wjob.change(endTime=datetime.datetime.utcnow())
[docs]class SimpleUWSTransitions(UWSTransitions): """A UWSTransitions with sensible transitions pre-defined. See the source for what we consider sensible. The idea here is that you simply override (and usually up-call) the methods queueJob, markAborted, startJob, completeJob, killJob, errorOutJob, and ignoreAndLog. You will have to define startJob and provide some way to execute startJob on QUEUED jobs (there's nothing wrong with immediately calling self.startJob(...) if you don't mind the DoS danger). Once you have startJob, you'll probably want to define killJob as well. """ def __init__(self, name): UWSTransitions.__init__(self, name, [ (PENDING, QUEUED, "queueJob"), (PENDING, ABORTED, "markAborted"), (QUEUED, ABORTED, "markAborted"), (QUEUED, EXECUTING, "startJob"), (EXECUTING, COMPLETED, "completeJob"), (EXECUTING, ABORTED, "killJob"), (EXECUTING, ERROR, "errorOutJob"), (COMPLETED, ERROR, "ignoreAndLog"), ])
[docs] def queueJob(self, newState, wjob, ignored): """puts a job on the queue. """ wjob.completeParams() wjob.change(phase=QUEUED)
[docs] def markAborted(self, newState, wjob, ignored): """simply marks job as aborted. This is what happens if you abort a job from QUEUED or PENDING. """ wjob.change(phase=ABORTED, endTime=datetime.datetime.utcnow())
[docs] def ignoreAndLog(self, newState, wjob, exc): """logs an attempt to transition when it's impossible but shouldn't result in an error. This is mainly so COMPLETED things don't fail just because of some mishap. """ base.ui.logErrorOccurred("Request to push %s job to ERROR: %s"%( wjob.phase, str(exc)))
[docs] def errorOutJob(self, newPhase, wjob, exception): """pushes a job to an error state. This is called by a worker; leaving the error message itself is part of the worker's duty; here, exception will just be logged. """ wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow()) self.flagError(newPhase, wjob, exception)
[docs] def killJob(self, newPhase, wjob, ignored): # pragma: no cover """should abort a job. There's really not much we can do here, so this is a no-op. Do not up-call here, you'll get a (then spurious) warning if you do. """ base.ui.notifyWarning("%s UWSes cannot kill jobs"%self.name)
[docs] def completeJob(self, newPhase, wjob, ignored): """pushes a job into the completed state. """ wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())
def _replaceFDs(inFName, outFName): # This is used for clean forking and doesn't actually belong here. # utils.ostricks should take this. """closes all (findable) file descriptors and replaces stdin with inF and stdout/err with outF. """ for fd in range(255, -1, -1): try: os.close(fd) except os.error: pass inF, outF = open(inFName), open(outFName, "w") os.dup(inF.fileno()) os.dup(outF.fileno()) os.dup(outF.fileno()) class _UWSBackendProtocol(protocol.ProcessProtocol): """The protocol used for taprunners when spawning them under a twisted reactor. """ def __init__(self, jobId, workerSystem): self.jobId = jobId self.workerSystem = workerSystem def outReceived(self, data): base.ui.notifyInfo("UWS worker %s produced output: %s"%( self.jobId, data)) def errReceived(self, data): base.ui.notifyInfo("UWS worker %s produced an error message: %s"%( self.jobId, data)) def processEnded(self, statusObject): """tries to ensure the job is in an admitted end state. """ try: job = self.workerSystem.getJob(self.jobId) if job.phase==QUEUED or job.phase==EXECUTING: self.workerSystem.changeToPhase(self.jobId, ERROR, UWSError("Job hung in %s"%job.phase, job.jobId)) except JobNotFound: # job already deleted pass
[docs]class ProcessBasedUWSTransitions(SimpleUWSTransitions): """A SimpleUWSTransistions that processes its stuff in a child process. Inheriting classes must implement the getCommandLine(wjob) method -- it must return a command (suitable for reactor.spawnProcess and os.execlp) and a list of arguments suitable for reactor.spawnProcess. They must also implement some sort of queue management. The the simplest case, override queueJob and start the job from there (but set to QUEUED in there anyway). """ # 2019 trial (at least sometimes) starts the reactor too late for # us to notice it's running in startJob. The following attribute # lets the testing code force the use of twisted mechanismus. trial_forceTwisted = False
[docs] def getCommandLine(self, wjob): raise NotImplementedError("%s transitions do not define how" " to get a command line"%self.__class__.__name__)
def _startJobTwisted(self, wjob): """starts a job by forking a new process when we're running within a twisted reactor. """ assert wjob.phase==QUEUED cmd, args = self.getCommandLine(wjob) pt = reactor.spawnProcess(_UWSBackendProtocol(wjob.jobId, wjob.uws), cmd, args=args, env=os.environ) wjob.change(pid=pt.pid, phase=EXECUTING) def _startJobNonTwisted(self, wjob): """forks off a new process when (hopefully) a manual child reaper is in place. """ cmd, args = self.getCommandLine(wjob) pid = os.fork() if pid==0: _replaceFDs("/dev/zero", "/dev/null") os.execlp(cmd, *args) elif pid>0: wjob.change(pid=pid, phase=EXECUTING) else: raise Exception("Could not fork")
[docs] def startJob(self, newState, wjob, ignored): """causes a process to be started that executes job. This dispatches according to whether or not we are within a twisted event loop, mostly for testing support. """ withinTwisted = self.trial_forceTwisted or reactor.running # 2019 trial starts the reactor too late for us to notice we should # be using the twisted code. I hack around it by the trial code if withinTwisted: return self._startJobTwisted(wjob) else: return self._startJobNonTwisted(wjob)
[docs] def killJob(self, newState, wjob, ignored): """tries to kill/abort job. Actually, there are two different scenarios here: Either the job has a non-NULL startTime. In that case, the child job is in control and will manage the state itself. Then kill -INT will do the right thing. However, if startTime is NULL, the child is still starting up. Sending a kill -INT may do many things, and most of them we don't want. So, in this case we kill -TERM the child, do state management ourselves and hope for the best. """ try: pid = wjob.pid if pid is None: raise UWSError("Job is not running") elif pid<2: raise UWSError("Job has unkillable PID") if wjob.startTime is None: # the child job is not up yet, kill it brutally and manage # state ourselves os.kill(pid, signal.SIGTERM) self.markAborted(ABORTED, wjob, ignored) else: # child job is up, can manage state itself os.kill(pid, signal.SIGINT) except UWSError: raise except Exception as ex: raise UWSError(None, ex)
@functools.lru_cache(None) def _getUWSGrammar(): """returns the context grammar for the standard UWS parameters. """ inputTD = base.parseFromString(svcs.InputTD, """<inputTable id="generic-uws-args"> <inputKey name="upload" type="file" multiplicity="multiple" description="A DALI-type upload string"/> <inputKey name="runid" type="text" description="Ignored by this service."/> <inputKey name="action" type="text" multiplicity="single" description="On posts to the job URI, you can pass in DELETE here instead of using the standards-compliant DELETE method. Ignored otherwise."/> <inputKey name="wait" type="text" unit="s" multiplicity="single" description="On certain operations, pass a timeout here to have the server delay its response until there is a change to report."/> <inputKey name="phase" type="text" multiplicity="single" description="A UWS PHASE; the interpretation is different across the UWS operations."/> <inputKey name="executionduration" type="text" unit="s" multiplicity="single" description="When posting to the job resource, set the maximum run time of the query to this."/> <inputKey name="destruction" type="text" multiplicity="single" description="When posting to the job resource, adjust the time at which the job will be thrown away to this."/> </inputTable>""") return base.makeStruct(svcs.ContextGrammar, inputTD=inputTD)
[docs]def prepareRequest(request, service): """prepares request for processing UWS requests. In UWS, the service in general doesn't run, and hence there's no context grammar running. Also, there are the UWS-specific parameters, which the core shouldn't be seeing. These latter ones are parsed into request.uwsArgs by this functions. This is what drives uwsactions. The function removes the corresponding keys from strargs so the core in question is not confused by them. """ uwsGrammar = _getUWSGrammar() request.uwsArgs = uwsGrammar.parseStrargs(request.strargs) uwsKeys = [ik.name for ik in uwsGrammar.iterInputKeys()] if service.core.name_=="tapCore": from gavo.protocols import tap tap.mangleUploads(request) # TAP needs to retain the upload parameter (in contrast to DALI, # where the keys already contain the uploads) uwsKeys.remove("upload") else: dali.mangleUploads(request) # now remove the UWS arguments so they don't confuse any actual cores. for uwsInputName in uwsKeys: if uwsInputName in request.strargs: del request.strargs[uwsInputName]