1 """
2 Manipulating UWS jobs through a REST interface.
3
4 The result documents are defined through the schema uws-1.0.xsd.
5
6 Instead of returning XML, they can also raise WebRedirect exceptions.
7 However, these are caught in JobResource._redirectAsNecessary and appended
8 to the base URL auf the TAP service, so you must only give URIs relative
9 to the TAP service's root URL.
10
11 This UWS system should adapt to concrete UWSes; the UWS in use is passed
12 into the top-level functions (doJobAction , getJobList)
13 """
14
15
16
17
18
19
20
21 import os
22
23 from twisted.internet import reactor
24 from twisted.internet import defer
25
26 from nevow import inevow
27 from nevow import rend
28 from nevow import static
29
30 from gavo import base
31 from gavo import svcs
32 from gavo import utils
33 from gavo.protocols import uws
34 from gavo.utils import stanxml
35 from gavo.votable import V
36
37
38 UWSNamespace = 'http://www.ivoa.net/xml/UWS/v1.0'
39 XlinkNamespace = "http://www.w3.org/1999/xlink"
40 stanxml.registerPrefix("uws", UWSNamespace,
41 stanxml.schemaURL("UWS-1.1.xsd"))
42 stanxml.registerPrefix("xlink", XlinkNamespace,
43 stanxml.schemaURL("xlink.xsd"))
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 _CASE_INSENSITIVE_KEYS = set(["request", "version", "lang", "query",
59 "format", "maxrec", "runid", "upload", "action", "phase",
60 "executionduration", "destruction", "responseformat"])
69
70
71 -class UWS(object):
72 """the container for elements from the uws namespace.
73 """
76
77 @staticmethod
82
83 - class job(UWSElement):
85
86 - class jobs(UWSElement):
89
91
93 - class endTime(stanxml.NillableMixin, UWSElement): pass
96 - class jobId(UWSElement): pass
99 - class ownerId(stanxml.NillableMixin, UWSElement): pass
100 - class phase(UWSElement): pass
101 - class quote(stanxml.NillableMixin, UWSElement): pass
102 - class runId(UWSElement): pass
103 - class startTime(stanxml.NillableMixin, UWSElement): pass
104
110
114
122
128
137
140
141
142 -def getJobList(workerSystem,
143 forOwner=None,
144 phase=None,
145 last=None,
146 after=None):
154
157
158
159 errDesc = job.error
160 if not errDesc:
161 return None
162 msg = errDesc["msg"]
163 if errDesc["hint"]:
164 msg = msg+"\n\n -- Hint: "+errDesc["hint"]
165 return UWS.errorSummary(type="fatal", hasDetail="false")[
166 UWS.message[msg]]
167
179
182 """A collection of "actions" performed on UWS jobs.
183
184 Their names are the names of the child resources of UWS jobs. The basic UWS
185 actions are built in. When constructing those, you can pass in as many
186 additional JobAction subclasses as you want. Set their names to
187 one of UWS standard actions to override UWS behaviour if you think
188 that's wise.
189 """
190 _standardActions = {}
191
192 - def __init__(self, *additionalActions):
193 self.actions = {}
194 self.actions.update(self._standardActions)
195 for actionClass in additionalActions:
196 self.actions[actionClass.name] = actionClass()
197
198 @classmethod
201
202 - def dispatch(self, action, job, request, segments):
214
217 """an action done to a job.
218
219 It defines methods do<METHOD> that are dispatched through JobActions.
220
221 It must have a name corresponding to the child resource names from
222 the UWS spec.
223 """
224 name = None
225 mime = "text/xml"
226
235
238 """A TAP error message.
239
240 These are constructed with errInfo, which is either an exception or
241 a dictionary containing at least type, msg, and hint keys. Optionally,
242 you can give a numeric httpStatus.
243 """
244 - def __init__(self, errInfo, httpStatus=400):
245 if isinstance(errInfo, Exception):
246 errInfo = {
247 "msg": unicode(errInfo),
248 "type": errInfo.__class__.__name__,
249 "hint": getattr(errInfo, "hint", None)}
250 if errInfo["type"]=="JobNotFound":
251 httpStatus = 404
252 self.errMsg, self.httpStatus = errInfo["msg"], httpStatus
253 self.hint = errInfo["hint"]
254
267
279 JobActions.addStandardAction(ErrorAction)
283
284
285
286
287 name = "startTime"
288 mime = "text/plain"
289
290 - def doGET(self, job, request):
295
296 doPOST = doGET
297 JobActions.addStandardAction(StartTimeAction)
313
314 JobActions.addStandardAction(ParameterAction)
317 name = "phase"
318 mime = "text/plain"
319 timeout = 10
320
321 - def doPOST(self, job, request):
322 newPhase = utils.getfirst(request.args, "phase", None)
323 if newPhase=="RUN":
324 job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout)
325 elif newPhase=="ABORT":
326 job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout)
327 else:
328 raise base.ValidationError("Bad phase: %s"%newPhase, "phase")
329 raise svcs.WebRedirect(job.jobId)
330
331 - def doGET(self, job, request):
334 JobActions.addStandardAction(PhaseAction)
359
366 JobActions.addStandardAction(ExecDAction)
374 JobActions.addStandardAction(DestructionAction)
378 name = "quote"
379 mime = "text/plain"
380
381 - def doGET(self, job, request):
388
389 JobActions.addStandardAction(QuoteAction)
393 name = "owner"
394 mime = "text/plain"
395
396 - def doGET(self, job, request):
403
404 JobActions.addStandardAction(OwnerAction)
412
415 """Access result (Extension: and other) files in job directory.
416 """
417 name = "results"
418
444
445 - def doGET(self, job, request):
446 return _getResultsElement(job)
447
448 JobActions.addStandardAction(ResultsAction)
455
458 """Actions for async/jobId.
459 """
460 name = ""
461
462 @utils.memoized
481
489
490 - def doPOST(self, wjob, request):
491 """Implements POST on a job resource.
492
493 This is a DaCHS extension to UWS in order to let web browsers
494 delete jobs by passing action=DELETE.
495 """
496 if utils.getfirst(request.args, "action", None)=="DELETE":
497 self.doDELETE(wjob, request)
498 else:
499 raise svcs.BadMethod("POST")
500
502 """delays if request has UWS 1.1 "slow poll" arguments, returns None
503 otherwise.
504
505 This is a helper for doGET.
506 """
507
508
509
510
511
512 args = svcs.CoreArgs.fromRawArgs(
513 self.getJobInputTD(),
514 request.args).args
515
516 if args["WAIT"] is None:
517 return
518 if args["WAIT"]==-1:
519 args["WAIT"] = base.getConfig("async", "maxslowpollwait")
520
521 if args["PHASE"] is not None and args["PHASE"]!=job.phase:
522 return
523
524 if job.phase not in ["QUEUED", "EXECUTING"]:
525 return
526
527 d = defer.Deferred().addCallback(
528 self._recheck, job.uws, job.jobId, args["WAIT"]-1, job.phase)
529 reactor.callLater(1, d.callback, request)
530 return d
531
532 - def _recheck(self,
533 request, workerSystem, jobId, remainingWait, originalPhase):
545
546
547 - def doGET(self, job, request):
548 """Implements GET on a job resource: return the current job metadata.
549 """
550 delay = self._delayIfWAIT(job, request)
551 if delay:
552 return delay
553
554 tree = UWS.makeRoot(UWS.job[
555 UWS.jobId[job.jobId],
556 UWS.runId[job.runId],
557 UWS.ownerId[job.owner],
558 UWS.phase[job.phase],
559 UWS.quote[utils.formatISODT(job.quote)],
560 UWS.creationTime[utils.formatISODT(job.creationTime)],
561 _serializeTime(UWS.startTime, job.startTime),
562 _serializeTime(UWS.endTime, job.endTime),
563 UWS.executionDuration[str(job.executionDuration)],
564 UWS.destruction[utils.formatISODT(job.destructionTime)],
565 getParametersElement(job),
566 _getResultsElement(job),
567 getErrorSummary(job)])
568 return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
569
570
571 JobActions.addStandardAction(RootAction)
572
573
574 -def doJobAction(workerSystem, request, segments):
575 """handles the async UI of UWS.
576
577 Depending on method and segments, it will return various XML strings
578 and may cause certain actions.
579
580 Segments must be a tuple with at least one element, the job id.
581 """
582 jobId, segments = segments[0], segments[1:]
583 if not segments:
584 action = ""
585 else:
586 action, segments = segments[0], segments[1:]
587 return workerSystem.jobActions.dispatch(action,
588 workerSystem.getJob(jobId), request, segments)
589