gavo.protocols.uws module

Support classes for the universal worker service.

class gavo.protocols.uws.BaseUWSJob(props, uws, writable=False)[source]

Bases: object

An abstract UWS job.

UWS jobs are always instantiiated with a row from the associated jobs table (i.e. a dictionary giving all the uws properties). You can read the properties as attributes. UWSJobs also keep a (weak) reference to the UWS that made them.

Make sure you don’t confuse the UWS properties with the JCL parameters (which are what the core receives). the latter are in the parameters property.

To alter uws properties, use the change method. This will fail unless the job was created giving writable=True.

To make it concrete, you need to define:

  • a _jobsTDid attribute giving the (cross) id of the UWS jobs table for this kind of job

  • a _transitions attribute giving a UWSTransitions instance that defines what to do on transitions

  • as needed, class methods _default_<parName> if you need to default job parameters in newly created jobs

  • as needed, methods _decode_<parName> and _encode_<parName> to bring uws parameters (i.e., everything that has a DB column) from and to the DB representation from python values.

You may want to override:

  • a class method getNewId(uws, writableConn) -> str, a method allocating a unique id for a new job and returning it. Beware of races here; if your allocation is through the database table, you’ll have to lock it and write a preliminary record with your new id. The default implementation does this, but if you want something in the file system, you probably don’t want to use that.

  • a method prepareForDestruction() to do extra cleanup before a job is destroyed.

The job parameters – those eventually passed to the core when the job runs – are held in a dictionary parameters. You should in general not do any parsing of them at the UWS level, as that’s really the job of the service/core, as in the sync case.

However, certain job parameters need handling at the async level, in particular UPLOAD. In that case, define _preprocess_<parName> methods receiving the value as parsed by the grammar and have to return something that can be serialised to the database.

Note that when you assign to job.parameters yourself, these changes will be ignored. Always use setPar to modify job.parameters. Reading job.parameters is fine.

If you need to clean up before the job is torn down, redefine the prepareForDestruction method.

change(**kwargs)[source]

changes the property values to what’s given by the keyword arguments.

It is an AttributeError to try and change a property that is not defined.

completeParams()[source]

completes self’s parameters from the worker’s context grammar.

Job must be writable for this to work.

This is necessary because in UWS operations, actions for missing parameters are never executed; missing parameters could still be provided later.

Worker systems hence must arrange for this to be called when queueing a job; SimpleUWSTransitions.queueJob already does that. This may raise an exception if parameters are missing.

classmethod getDefaults(conn)[source]

returns a dictionary suitable for inserting into a jobsTD table.

classmethod getNewId(uws, conn)[source]
getProperties()[source]

returns the properties of the job as they are stored in the database.

Use attribute access to read them and change to change them. Do not get values from the dictionary you get and do not change the dictionary.

getTransitionTo(newPhase)[source]

returns the action prescribed to push self to newPhase.

A ValidationError is raised if no such transition is defined.

getURL()[source]

returns the UWS URL for this job.

getWritable()[source]

a context manager for a writeable version of the job.

Changes will be written back at the end, and the job object itself will be updated from the database.

If self already is writable, it is returned unchanged, and changes are only persisted when the enclosing controlling block finishes.

iterSerializedPars()[source]

iterates over the serialized versions of the parameters.

This is, really for uwsactions.getParametersElement, and uses a secret handshake to let this create ByReference links.

prepareForDestruction()[source]

is called before the job’s database row is torn down.

Self is writable at this point.

property quote

Always returns None.

Override if you have a queue management.

setPar(parName, parValue)[source]

enters parName:parValue into self.parameters.

setParamsFromDict(argDict)[source]

sets our parameters from a dictionary of parsed parameters.

self must be writeable for this to work.

argDict should in general be the result of a contextgrammar, but, really, any dict will do.

We force-lowercase everything here. If that’s a problem for your protocol, you deserve no mercy.

setParamsFromRawDict(strargs)[source]

sets our parameters from a dictionary of string lists (i.e., requests.strargs).

The job must be writeable when you call this.

The arguments don’t have to be complete by the parameterGrammar; this also means that no defaults are inserted. Use job.completeParams() for that (it’s typically called when queuing a job).

update()[source]

fetches a new copy of the job props from the DB.

You should in general not need this, since UWSJob objects are intended to be short-lived (“for the duration of an async request”). Still, for testing and similar, it’s convenient to be able to update a UWS job from the database.

exception gavo.protocols.uws.JobNotFound(jobId)[source]

Bases: NotFoundError, UWSError

class gavo.protocols.uws.LocalFile(jobId, wd, fileName)[source]

Bases: object

A sentinel class representing a file within a job work directory (as resulting from an upload).

getURL()[source]

returns the URL the file is retrievable under for the life time of the job.

class gavo.protocols.uws.ParameterRef(url, local=False)[source]

Bases: object

A UWS parameter that is (in effect) a URL.

This always contains a URL. In case of uploads, the tap renderer makes sure the upload is placed into the upload directory and generates a URL; in that case, local is True.

You need this class when you want the byReference attribute in the UWS.parameter element to be true.

class gavo.protocols.uws.ProcessBasedUWSTransitions(name)[source]

Bases: SimpleUWSTransitions

A SimpleUWSTransistions that processes its stuff in a child process.

Inheriting classes must implement the getCommandLine(wjob) method – it must return a command (suitable for reactor.spawnProcess and os.execlp) and a list of arguments suitable for reactor.spawnProcess.

They must also implement some sort of queue management. The the simplest case, override queueJob and start the job from there (but set to QUEUED in there anyway).

getCommandLine(wjob)[source]
killJob(newState, wjob, ignored)[source]

tries to kill/abort job.

Actually, there are two different scenarios here: Either the job has a non-NULL startTime. In that case, the child job is in control and will manage the state itself. Then kill -INT will do the right thing.

However, if startTime is NULL, the child is still starting up. Sending a kill -INT may do many things, and most of them we don’t want. So, in this case we kill -TERM the child, do state management ourselves and hope for the best.

startJob(newState, wjob, ignored)[source]

causes a process to be started that executes job.

This dispatches according to whether or not we are within a twisted event loop, mostly for testing support.

trial_forceTwisted = False
class gavo.protocols.uws.SimpleUWSTransitions(name)[source]

Bases: UWSTransitions

A UWSTransitions with sensible transitions pre-defined.

See the source for what we consider sensible.

The idea here is that you simply override (and usually up-call) the methods queueJob, markAborted, startJob, completeJob, killJob, errorOutJob, and ignoreAndLog.

You will have to define startJob and provide some way to execute startJob on QUEUED jobs (there’s nothing wrong with immediately calling self.startJob(…) if you don’t mind the DoS danger).

Once you have startJob, you’ll probably want to define killJob as well.

completeJob(newPhase, wjob, ignored)[source]

pushes a job into the completed state.

errorOutJob(newPhase, wjob, exception)[source]

pushes a job to an error state.

This is called by a worker; leaving the error message itself is part of the worker’s duty; here, exception will just be logged.

ignoreAndLog(newState, wjob, exc)[source]

logs an attempt to transition when it’s impossible but shouldn’t result in an error.

This is mainly so COMPLETED things don’t fail just because of some mishap.

killJob(newPhase, wjob, ignored)[source]

should abort a job.

There’s really not much we can do here, so this is a no-op.

Do not up-call here, you’ll get a (then spurious) warning if you do.

markAborted(newState, wjob, ignored)[source]

simply marks job as aborted.

This is what happens if you abort a job from QUEUED or PENDING.

queueJob(newState, wjob, ignored)[source]

puts a job on the queue.

class gavo.protocols.uws.UWS(jobClass, jobActions)[source]

Bases: object

a facade for a universal worker service (UWS).

You must construct it with the job class (see UWSJob) and a uwsactions.JobActions instance

The UWS then provides methods to access the jobs table, create jobs and and deserialize jobs from the jobs table.

It also has a context grammar that parses the JCL parameters from request.strargs. In this base class, that parses no parameters at all, and you will almost certainly have to override this.

We have a “statements cache” in here, where we used the UWS table definition to create query strings we can later pass to the database. Don’t worry about this any more. Just write text queries when adding features. It’s more readable and just about as stable against code evolution.

You must override the getURLForId(jobId) method in your concrete implementation.

You should also override jobdocPreamble and joblistPreamble. This is raw XML that is prepended to job and list documents. This is primarily for PIs giving stylesheets, but given we don’t use doctypes you could provide internal subsets there, too. Anyway, see the TAP UWS runner for examples.

changeToPhase(jobId, newPhase, input=None, timeout=0.1)[source]
changeableJob(jobId, timeout=0.1)[source]

a context manager for job manipulation.

This is done such that any changes to the job’s properties within the controlled section get propagated to the database. As long as you are in the controlled section, nobody else can change the job.

cleanupInterval = 43200
cleanupJobsTable(includeFailed=False, includeCompleted=False, includeAll=False, includeForgotten=False)[source]

removes expired jobs from the UWS jobs table.

The constructor arranged for this to be called now and then (cleanupFrequency class attribute, defaulting to 12*3600).

The functionality is also exposed through gavo admin cleanuws; this also lets you use the includeFailed and includeCompleted flags. These should not be used on production services since you’d probably nuke jobs still interesting to your users.

countQueuedJobs()[source]

returns the number of QUEUED jobs in jobsTable.

countRunningJobs()[source]

returns the number of EXECUTING jobs in the jobsTable.

destroy(jobId)[source]

removes the job with jobId from the UWS.

This calls the job’s prepareForDestruction method while the job is writable.

getIdsAndPhases(owner=None, phase=None, last=None, after=None, initFragments=None, initPars=None)[source]

returns pairs for id and phase for all jobs in the UWS.

phase, last, after are the respective parameters from UWS 1.1.

getJob(jobId)[source]

returns a read-only UWSJob for jobId.

Note that by the time you do something with the information here, the “true” state in the database may already be different. There should be no way to write whatever information you have in here, so any “racing” here shouldn’t hurt.

getJobIds()[source]

returns a list of all currently existing job ids.

getNewIdFromArgs(uwsArgs, paramDict, user=None)[source]

returns a new UWS id for a job processing paramDict.

paramDict need to be parsed already (typically, it’s a CoreArgs.getParamDict() result) using the workerSystem’s parameterGrammar.

Pass user to restrict the job to that user. No authentication is done here, so make sure you actually authenticate user (cf. getNewIdFromRequest for how this could be done).

getNewIdFromRequest(request)[source]

returns the id of a new TAP job created from request.

Request has to be a t.w request or similar, but preprocessed by prepareRequest below to have the parsed UWS parameters in uwsArgs.

getNewJob(**kws)[source]

creates a new job and returns a read-only instance for it.

getNewJobId(**kws)[source]

creates a new job and returns its id.

kws can be properties of the new job or the special key timeout giving after how many seconds we should give up trying to lock the db.

getURLForId(jobId)[source]

returns the handling URL for the job with jobId.

You must override this in deriving classes.

jobdocPreamble = ''
joblistPreamble = ''
parameterGrammar = <gavo.svcs.inputdef.ContextGrammar object>
runCanned(statementId, args, conn)[source]

runs the canned statement statementId with args through the DB connection conn.

This will return row dictionaries of the result if there is a result.

exception gavo.protocols.uws.UWSError(msg, jobId=None, sourceEx=None, hint=None)[source]

Bases: Error

UWS-related errors, mainly to communicate with web renderers.

UWSErrors are constructed with a displayable message (may be None to autogenerate one), a jobId (give one; the default None is only there to avoid breaking legacy code) and optionally a source exception and a hint.

class gavo.protocols.uws.UWSJobType[source]

Bases: type

The metaclass for UWS jobs.

We have the metaclass primarily because we want to delay loading the actual definition until it is actually needed (otherwise we might get interesting chicken-egg-problems with rscdesc at some point).

A welcome side effect is that we can do custom constructors and similar cosmetic deviltry.

property jobsTD
class gavo.protocols.uws.UWSJobWithWD(props, uws, writable=False)[source]

Bases: BaseUWSJob

A UWS job with a working directory.

This generates ids from directory names in a directory (the uwsWD) shared for all UWSes on the system.

It also adds methods

  • getWD() -> str returning the working directory

  • addResult(self, source, mimeType, name=None) to add a new result

  • openResult(self, mimeType, name) -> file to get an open file in the WD to write to in order to generate a result

  • getResult(self, resName) -> str to get the path of a result with resName

  • getResults(self) -> list-of-dicts to get dicts describing all results available

  • openFile(self) -> file to get a file letting you read an existing result.

addResult(source, mimeType, name=None)[source]

adds a result, with data taken from source.

source may be a file-like object or a string, or bytes.

If no name is passed, a name is auto-generated.

fixTypeForResultName(resultName, mediaType)[source]

sets the media type for result resultName.

It is not an error if no result with resultName exists.

classmethod getNewId(uws, conn)[source]
getResult(resName)[source]

returns a pair of file name and mime type for a named job result.

If the result does not exist, a NotFoundError is raised.

getResults()[source]

returns a list of this service’s results.

The list contains dictionaries having at least resultName and resultType keys.

getWD()[source]
openFile(name, mode='rb')[source]

returns an open file object for a file within the job’s work directory.

This will only use the last path component, everything else is discarded.

openResult(mimeType, name)[source]

returns a writable file that adds a result.

prepareForDestruction()[source]

is called before the job’s database row is torn down.

Self is writable at this point.

class gavo.protocols.uws.UWSTransitions(name, vertices)[source]

Bases: object

An abstract base for classes defining the behaviour of a UWS.

This basically is the definition of a finite state machine with arbitrary input (which is to say: the input “alphabet” is up to the transitions).

A UWSTransitions instance is in the transitions attribute of a job class.

The main interface to UWSTransitions is getTransition(p1, p2) -> callable It returns a callable that should push the automaton from phase p1 to phase p2 or raise an ValidationError for a field phase.

The callable has the signature f(desiredPhase, wjob, input) -> None. It must alter the uwsJob object as appropriate. input is some object defined by the the transition. The job passed is a changeable job, so the handlers actually hold locks to the job row. Thus, be brief.

The transitions are implemented as simple methods having the signature of the callables returned by getTransition.

To link transitions and methods, pass a vertices list to the constructor. This list consists of 3-tuples of strings (from, to, method-name). From and to are phase names (use the symbols from this module to ward against typos).

flagError(newPhase, wjob, exception)[source]

the default action when transitioning to an error: dump exception and mark phase as ERROR..

getTransition(fromPhase, toPhase)[source]
noOp(newPhase, job, ignored)[source]

a sample action just setting the new phase.

This is a no-op baseline sometimes useful in user code.

noteEndTime(newPhase, wjob, ignored)[source]
class gavo.protocols.uws.UWSWithQueueing(jobClass, actions)[source]

Bases: UWS

A UWS with support for queueing.

Queuing is done on UWS level rather than at transitions. With a plain UWS, if something is put on the queue, it must be started by the Transition’s queueJob method.

With UWSWithQueuing, you just mark the job queued and the rest is taken care of by the UWS itself.

changeToPhase(jobId, newPhase, input=None, timeout=10)[source]

overridden here to hook in queue management.

checkProcessQueue()[source]

sees if any QUEUED process can be made EXECUTING.

This must be called while you’re not holding any changeableJob.

queueCheckInterval = 60
runcountGoal = 1
scheduleProcessQueueCheck()[source]

tells TAP UWS to try and dequeue jobs next time checkProcessQueue is called.

This function exists since during the TAPTransistions there’s a writable job and processing the queue might deadlock. So, rather than processing right away, we just note something may need to be done.

gavo.protocols.uws.prepareRequest(request, service)[source]

prepares request for processing UWS requests.

In UWS, the service in general doesn’t run, and hence there’s no context grammar running. Also, there are the UWS-specific parameters, which the core shouldn’t be seeing.

These latter ones are parsed into request.uwsArgs by this functions. This is what drives uwsactions. The function removes the corresponding keys from strargs so the core in question is not confused by them.