| Home | Trees | Indices | Help |
|
|---|
|
|
1 """
2 Manipulating UWS jobs through a REST interface.
3
4 The result documents are defined through the schema uws-1.0.xsd.
5
6 Instead of returning XML, they can also raise WebRedirect exceptions.
7 However, these are caught in JobResource._redirectAsNecessary and appended
8 to the base URL auf the TAP service, so you must only give URIs relative
9 to the TAP service's root URL.
10
11 This UWS system should adapt to concrete UWSes; the UWS in use is passed
12 into the top-level functions (doJobAction , getJobList)
13 """
14
15 #c Copyright 2008-2019, the GAVO project
16 #c
17 #c This program is free software, covered by the GNU GPL. See the
18 #c COPYING file in the source distribution.
19
20
21 import os
22
23 from twisted.internet import reactor
24 from twisted.internet import defer
25
26 from nevow import inevow
27 from nevow import rend
28 from nevow import static
29
30 from gavo import base
31 from gavo import svcs
32 from gavo import utils
33 from gavo.protocols import uws
34 from gavo.utils import stanxml
35 from gavo.votable import V
36
37
38 UWSNamespace = 'http://www.ivoa.net/xml/UWS/v1.0'
39 XlinkNamespace = "http://www.w3.org/1999/xlink"
40 stanxml.registerPrefix("uws", UWSNamespace,
41 stanxml.schemaURL("UWS-1.1.xsd"))
42 stanxml.registerPrefix("xlink", XlinkNamespace,
43 stanxml.schemaURL("xlink.xsd"))
44
45
46 # Sadly, TAP protocol keys need to be case insensitive (spec, 2.3.10)
47 # The code here assumes all keys to be in lowercase, and this function
48 # forces this. You should call it as soon as possible when processing
49 # requests.
50 #
51 # Note that non-protocol keys are not case-normalized, since there's always
52 # the hope for sane protocols that don't have crazy case-folding rules.
53 # UWS parameters are lower-cased, too, right now, though (in
54 # set/getSerializedPar, by a different mechanism).
55 #
56 # XXX TODO: there are TAP keys in here, too. Come up with a way
57 # to have worker systems say which keys they want case insensitive
58 _CASE_INSENSITIVE_KEYS = set(["request", "version", "lang", "query",
59 "format", "maxrec", "runid", "upload", "action", "phase",
60 "executionduration", "destruction", "responseformat"])
63 for key in args:
64 if key.lower()==key:
65 continue
66 if key.lower() in _CASE_INSENSITIVE_KEYS:
67 content = args.pop(key)
68 args[key.lower()] = content
69
72 """the container for elements from the uws namespace.
73 """
75 _prefix = "uws"
76
77 @staticmethod
82
84 _a_version = "1.1"
85
89
91
104
106 _a_href = None
107 _a_type = None
108 _name_a_href = "xlink:href"
109 _name_a_type = "xlink:type"
110
114
116 _additionalPrefixes = frozenset(["xlink"])
117 _a_id = None
118 _a_href = None
119 _a_type = None
120 _name_a_href = "xlink:href"
121 _name_a_type = "xlink:type"
122
128
130 _additionalPrefixes = frozenset(["xlink"])
131 _mayBeEmpty = True
132 _a_id = None
133 _a_href = None
134 _a_type = None
135 _name_a_href = "xlink:href"
136 _name_a_type = "xlink:type"
137
139 _mayBeEmpty = True
140
141
142 -def getJobList(workerSystem,
143 forOwner=None,
144 phase=None,
145 last=None,
146 after=None):
147 result = UWS.jobs()
148 for jobId, phase in workerSystem.getIdsAndPhases(
149 forOwner, phase, last, after):
150 result[
151 UWS.jobref(id=jobId, href=workerSystem.getURLForId(jobId))[
152 UWS.phase[phase]]]
153 return stanxml.xmlrender(result, workerSystem.joblistPreamble)
154
157 # all our errors are fatal, and for now .../error yields the same thing
158 # as we include here, so we hardcode the attributes.
159 errDesc = job.error
160 if not errDesc:
161 return None
162 msg = errDesc["msg"]
163 if errDesc["hint"]:
164 msg = msg+"\n\n -- Hint: "+errDesc["hint"]
165 return UWS.errorSummary(type="fatal", hasDetail="false")[
166 UWS.message[msg]]
167
170 """returns a UWS.parameters element for job.
171 """
172 res = UWS.parameters()
173 for key, value in job.iterSerializedPars():
174 if isinstance(value, uws.ParameterRef):
175 res[UWS.parameter(id=key, byReference=True)[value.url]]
176 else:
177 res[UWS.parameter(id=key)[value]]
178 return res
179
182 """A collection of "actions" performed on UWS jobs.
183
184 Their names are the names of the child resources of UWS jobs. The basic UWS
185 actions are built in. When constructing those, you can pass in as many
186 additional JobAction subclasses as you want. Set their names to
187 one of UWS standard actions to override UWS behaviour if you think
188 that's wise.
189 """
190 _standardActions = {}
191
193 self.actions = {}
194 self.actions.update(self._standardActions)
195 for actionClass in additionalActions:
196 self.actions[actionClass.name] = actionClass()
197
198 @classmethod
201
203 if job.owner:
204 if request.getUser()!=job.owner:
205 raise svcs.Authenticate()
206
207 try:
208 resFactory = self.actions[action]
209 except KeyError:
210 raise base.ui.logOldExc(
211 svcs.UnknownURI("Invalid UWS action '%s'"%action))
212 request.setHeader("content-type", resFactory.mime)
213 return resFactory.getResource(job, request, segments)
214
217 """an action done to a job.
218
219 It defines methods do<METHOD> that are dispatched through JobActions.
220
221 It must have a name corresponding to the child resource names from
222 the UWS spec.
223 """
224 name = None
225 mime = "text/xml"
226
235
238 """A TAP error message.
239
240 These are constructed with errInfo, which is either an exception or
241 a dictionary containing at least type, msg, and hint keys. Optionally,
242 you can give a numeric httpStatus.
243 """
245 if isinstance(errInfo, Exception):
246 errInfo = {
247 "msg": unicode(errInfo),
248 "type": errInfo.__class__.__name__,
249 "hint": getattr(errInfo, "hint", None)}
250 if errInfo["type"]=="JobNotFound":
251 httpStatus = 404
252 self.errMsg, self.httpStatus = errInfo["msg"], httpStatus
253 self.hint = errInfo["hint"]
254
256 request = inevow.IRequest(ctx)
257 request.setHeader("content-type", "text/xml")
258 request.setResponseCode(self.httpStatus)
259 doc = V.VOTABLE[
260 V.RESOURCE(type="results") [
261 V.INFO(name="QUERY_STATUS", value="ERROR")[
262 self.errMsg]]]
263 if self.hint:
264 doc[V.INFO(name="HINT", value="HINT")[
265 self.hint]]
266 return doc.render()
267
279 JobActions.addStandardAction(ErrorAction)
283 # This an extension over plain UWS allowing users to retrieve when
284 # their job started. In the DaCHS' TAP implementation, this lets
285 # you discern whether the taprunner is already processing an EXECUTING
286 # job (startTime!=NULL) or whether it's still coming up (else)
287 name = "startTime"
288 mime = "text/plain"
289
291 if job.startTime is None:
292 return "NULL"
293 else:
294 return utils.formatISODT(job.startTime)
295
296 doPOST = doGET
297 JobActions.addStandardAction(StartTimeAction)
301 name = "parameters"
302
304 request.setHeader("content-type", "text/xml")
305 return UWS.makeRoot(getParametersElement(job))
306
308 if job.phase!=uws.PENDING:
309 raise base.ValidationError(
310 "Parameters cannot be changed in phase %s"%job.phase, "phase")
311 job.setParamsFromRequest(request)
312 raise svcs.WebRedirect(job.jobId)
313
314 JobActions.addStandardAction(ParameterAction)
317 name = "phase"
318 mime = "text/plain"
319 timeout = 10 # this is here for testing
320
322 newPhase = utils.getfirst(request.args, "phase", None)
323 if newPhase=="RUN":
324 job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout)
325 elif newPhase=="ABORT":
326 job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout)
327 else:
328 raise base.ValidationError("Bad phase: %s"%newPhase, "phase")
329 raise svcs.WebRedirect(job.jobId)
330
334 JobActions.addStandardAction(PhaseAction)
338 """Abstract base for ExecDAction and DestructionAction.
339 """
340 mime = "text/plain"
341
343 raw = utils.getfirst(request.args, self.name.lower(), None)
344 if raw is None: # with no parameter, fall back to GET
345 return self.doGET(job, request)
346 try:
347 val = self.deserializeValue(raw)
348 except ValueError:
349 raise base.ui.logOldExc(uws.UWSError("Invalid %s value: %s."%(
350 self.name.upper(), repr(raw)), job.jobId))
351 with job.getWritable() as wjob:
352 args = {self.attName: val}
353 wjob.change(**args)
354 raise svcs.WebRedirect(job.jobId)
355
357 request.setHeader("content-type", "text/plain")
358 return self.serializeValue(getattr(job, self.attName))
359
362 name = "executionduration"
363 attName = 'executionDuration'
364 serializeValue = str
365 deserializeValue = float
366 JobActions.addStandardAction(ExecDAction)
370 name = "destruction"
371 attName = "destructionTime"
372 serializeValue = staticmethod(utils.formatISODT)
373 deserializeValue = staticmethod(utils.parseISODT)
374 JobActions.addStandardAction(DestructionAction)
388
389 JobActions.addStandardAction(QuoteAction)
403
404 JobActions.addStandardAction(OwnerAction)
408 baseURL = job.getURL()+"/results/"
409 return UWS.results[[
410 UWS.result(id=res["resultName"], href=baseURL+res["resultName"])
411 for res in job.getResults()]]
412
415 """Access result (Extension: and other) files in job directory.
416 """
417 name = "results"
418
420 if not segments:
421 return JobAction.getResource(self, job, request, segments)
422
423 # first try a "real" UWS result from the job
424 if len(segments)==1:
425 try:
426 fName, resultType = job.getResult(segments[0])
427 res = static.File(fName)
428 res.type = str(resultType)
429 res.encoding = None
430 return res
431 except base.NotFoundError: # segments[0] does not name a result
432 pass # fall through to other files
433
434 # if that doesn't work, try to return some other file from the
435 # job directory. This is so we can deliver uploads.
436 filePath = os.path.join(job.getWD(), *segments)
437 if not os.path.abspath(filePath).startswith(
438 os.path.abspath(job.getWD())):
439 raise svcs.ForbiddenURI("Not serving files outside of job directory.")
440
441 if not os.path.exists(filePath):
442 raise svcs.UnknownURI("File not found")
443 return static.File(filePath, defaultType="application/octet-stream")
444
446 return _getResultsElement(job)
447
448 JobActions.addStandardAction(ResultsAction)
455
458 """Actions for async/jobId.
459 """
460 name = ""
461
462 @utils.memoized
464 """returns an InputTable to parse the arguments of the UWS1.1 polling.
465 """
466 return base.parseFromString(svcs.InputTD,
467 """
468 <inputTable id="jobresource_args">
469 <inputKey name="PHASE" type="text" multiplicity="single"
470 description="Return immediately unless job is in this phase.">
471 <values>
472 <!-- we reject polling against PENDING, too, since it doesn't make
473 much sense -->
474 <option>QUEUED</option>
475 <option>EXECUTING</option>
476 </values>
477 </inputKey>
478 <inputKey name="WAIT" type="integer" multiplicity="single"
479 description="Seconds to wait with an answer if no change occurred."/>
480 </inputTable>""")
481
483 """Implements DELETE on a job resource.
484
485 This is the UWS-compliant way to delete a job.
486 """
487 job.uws.destroy(job.jobId)
488 raise svcs.WebRedirect("")
489
491 """Implements POST on a job resource.
492
493 This is a DaCHS extension to UWS in order to let web browsers
494 delete jobs by passing action=DELETE.
495 """
496 if utils.getfirst(request.args, "action", None)=="DELETE":
497 self.doDELETE(wjob, request)
498 else:
499 raise svcs.BadMethod("POST")
500
502 """delays if request has UWS 1.1 "slow poll" arguments, returns None
503 otherwise.
504
505 This is a helper for doGET.
506 """
507 # This is the implemenation of UWS 1.1 "slow polling".
508 # We still do polling internally rather than use postgres'
509 # LISTEN/NOTIFY since the overhead seems rather moderate and
510 # the added complexity of setting up notifcations appeared not
511 # proportional to saving it.
512 args = svcs.CoreArgs.fromRawArgs(
513 self.getJobInputTD(),
514 request.args).args
515
516 if args["WAIT"] is None:
517 return
518 if args["WAIT"]==-1:
519 args["WAIT"] = base.getConfig("async", "maxslowpollwait")
520
521 if args["PHASE"] is not None and args["PHASE"]!=job.phase:
522 return
523
524 if job.phase not in ["QUEUED", "EXECUTING"]:
525 return
526
527 d = defer.Deferred().addCallback(
528 self._recheck, job.uws, job.jobId, args["WAIT"]-1, job.phase)
529 reactor.callLater(1, d.callback, request)
530 return d
531
534 """the callback for doing slow polls.
535 """
536 job = workerSystem.getJob(jobId)
537 if originalPhase!=job.phase or remainingWait<=0:
538 request.args = {}
539 return self.doGET(job, request)
540
541 d = defer.Deferred().addCallback(
542 self._recheck, workerSystem, jobId, remainingWait-1, originalPhase)
543 reactor.callLater(1, d.callback, request)
544 return d
545
546
548 """Implements GET on a job resource: return the current job metadata.
549 """
550 delay = self._delayIfWAIT(job, request)
551 if delay:
552 return delay
553
554 tree = UWS.makeRoot(UWS.job[
555 UWS.jobId[job.jobId],
556 UWS.runId[job.runId],
557 UWS.ownerId[job.owner],
558 UWS.phase[job.phase],
559 UWS.quote[utils.formatISODT(job.quote)],
560 UWS.creationTime[utils.formatISODT(job.creationTime)],
561 _serializeTime(UWS.startTime, job.startTime),
562 _serializeTime(UWS.endTime, job.endTime),
563 UWS.executionDuration[str(job.executionDuration)],
564 UWS.destruction[utils.formatISODT(job.destructionTime)],
565 getParametersElement(job),
566 _getResultsElement(job),
567 getErrorSummary(job)])
568 return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
569
570
571 JobActions.addStandardAction(RootAction)
575 """handles the async UI of UWS.
576
577 Depending on method and segments, it will return various XML strings
578 and may cause certain actions.
579
580 Segments must be a tuple with at least one element, the job id.
581 """
582 jobId, segments = segments[0], segments[1:]
583 if not segments:
584 action = ""
585 else:
586 action, segments = segments[0], segments[1:]
587 return workerSystem.jobActions.dispatch(action,
588 workerSystem.getJob(jobId), request, segments)
589
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Thu May 2 07:29:09 2019 | http://epydoc.sourceforge.net |