Package gavo :: Package protocols :: Module uwsactions
[frames] | no frames]

Source Code for Module gavo.protocols.uwsactions

  1  """ 
  2  Manipulating UWS jobs through a REST interface. 
  3   
  4  The result documents are defined through the schema uws-1.0.xsd. 
  5   
  6  Instead of returning XML, they can also raise WebRedirect exceptions. 
  7  However, these are caught in JobResource._redirectAsNecessary and appended 
  8  to the base URL auf the TAP service, so you must only give URIs relative 
  9  to the TAP service's root URL. 
 10   
 11  This UWS system should adapt to concrete UWSes; the UWS in use is passed 
 12  into the top-level functions (doJobAction , getJobList) 
 13  """ 
 14   
 15  #c Copyright 2008-2019, the GAVO project 
 16  #c 
 17  #c This program is free software, covered by the GNU GPL.  See the 
 18  #c COPYING file in the source distribution. 
 19   
 20   
 21  import os 
 22   
 23  from twisted.internet import reactor 
 24  from twisted.internet import defer 
 25   
 26  from nevow import inevow 
 27  from nevow import rend 
 28  from nevow import static 
 29   
 30  from gavo import base 
 31  from gavo import svcs 
 32  from gavo import utils 
 33  from gavo.protocols import uws 
 34  from gavo.utils import stanxml 
 35  from gavo.votable import V 
 36   
 37   
 38  UWSNamespace = 'http://www.ivoa.net/xml/UWS/v1.0' 
 39  XlinkNamespace = "http://www.w3.org/1999/xlink" 
 40  stanxml.registerPrefix("uws", UWSNamespace, 
 41          stanxml.schemaURL("UWS-1.1.xsd")) 
 42  stanxml.registerPrefix("xlink", XlinkNamespace, 
 43          stanxml.schemaURL("xlink.xsd")) 
 44   
 45   
 46  # Sadly, TAP protocol keys need to be case insensitive (spec, 2.3.10) 
 47  # The code here assumes all keys to be in lowercase, and this function 
 48  # forces this.  You should call it as soon as possible when processing 
 49  # requests. 
 50  # 
 51  # Note that non-protocol keys are not case-normalized, since there's always 
 52  # the hope for sane protocols that don't have crazy case-folding rules. 
 53  # UWS parameters are lower-cased, too, right now, though (in  
 54  # set/getSerializedPar, by a different mechanism). 
 55  # 
 56  # XXX TODO: there are TAP keys in here, too.  Come up with a way 
 57  # to have worker systems say which keys they want case insensitive 
 58  _CASE_INSENSITIVE_KEYS = set(["request", "version", "lang", "query",  
 59          "format", "maxrec", "runid", "upload", "action", "phase", 
 60          "executionduration", "destruction", "responseformat"]) 
61 62 -def lowercaseProtocolArgs(args):
63 for key in args: 64 if key.lower()==key: 65 continue 66 if key.lower() in _CASE_INSENSITIVE_KEYS: 67 content = args.pop(key) 68 args[key.lower()] = content
69
70 71 -class UWS(object):
72 """the container for elements from the uws namespace. 73 """
74 - class UWSElement(stanxml.Element):
75 _prefix = "uws"
76 77 @staticmethod
78 - def makeRoot(ob):
79 ob._additionalPrefixes = stanxml.xsiPrefix 80 ob._mayBeEmpty = True 81 return ob
82
83 - class job(UWSElement):
84 _a_version = "1.1"
85
86 - class jobs(UWSElement):
87 _mayBeEmpty = True 88 _a_version = "1.1"
89
90 - class parameters(UWSElement): pass
91
92 - class destruction(UWSElement): pass
93 - class endTime(stanxml.NillableMixin, UWSElement): pass
94 - class creationTime(UWSElement): pass
95 - class executionDuration(UWSElement): pass
96 - class jobId(UWSElement): pass
97 - class jobInfo(UWSElement): pass
98 - class message(UWSElement): pass
99 - class ownerId(stanxml.NillableMixin, UWSElement): pass
100 - class phase(UWSElement): pass
101 - class quote(stanxml.NillableMixin, UWSElement): pass
102 - class runId(UWSElement): pass
103 - class startTime(stanxml.NillableMixin, UWSElement): pass
104
105 - class detail(UWSElement):
106 _a_href = None 107 _a_type = None 108 _name_a_href = "xlink:href" 109 _name_a_type = "xlink:type"
110
111 - class errorSummary(UWSElement):
112 _a_type = None # transient | fatal 113 _a_hasDetail = None
114
115 - class jobref(UWSElement):
116 _additionalPrefixes = frozenset(["xlink"]) 117 _a_id = None 118 _a_href = None 119 _a_type = None 120 _name_a_href = "xlink:href" 121 _name_a_type = "xlink:type"
122
123 - class parameter(UWSElement):
124 _mayBeEmpty = True 125 _a_byReference = None 126 _a_id = None 127 _a_isPost = None
128
129 - class result(UWSElement):
130 _additionalPrefixes = frozenset(["xlink"]) 131 _mayBeEmpty = True 132 _a_id = None 133 _a_href = None 134 _a_type = None 135 _name_a_href = "xlink:href" 136 _name_a_type = "xlink:type"
137
138 - class results(UWSElement):
139 _mayBeEmpty = True
140
141 142 -def getJobList(workerSystem, 143 forOwner=None, 144 phase=None, 145 last=None, 146 after=None):
147 result = UWS.jobs() 148 for jobId, phase in workerSystem.getIdsAndPhases( 149 forOwner, phase, last, after): 150 result[ 151 UWS.jobref(id=jobId, href=workerSystem.getURLForId(jobId))[ 152 UWS.phase[phase]]] 153 return stanxml.xmlrender(result, workerSystem.joblistPreamble)
154
155 156 -def getErrorSummary(job):
157 # all our errors are fatal, and for now .../error yields the same thing 158 # as we include here, so we hardcode the attributes. 159 errDesc = job.error 160 if not errDesc: 161 return None 162 msg = errDesc["msg"] 163 if errDesc["hint"]: 164 msg = msg+"\n\n -- Hint: "+errDesc["hint"] 165 return UWS.errorSummary(type="fatal", hasDetail="false")[ 166 UWS.message[msg]]
167
168 169 -def getParametersElement(job):
170 """returns a UWS.parameters element for job. 171 """ 172 res = UWS.parameters() 173 for key, value in job.iterSerializedPars(): 174 if isinstance(value, uws.ParameterRef): 175 res[UWS.parameter(id=key, byReference=True)[value.url]] 176 else: 177 res[UWS.parameter(id=key)[value]] 178 return res
179
180 181 -class JobActions(object):
182 """A collection of "actions" performed on UWS jobs. 183 184 Their names are the names of the child resources of UWS jobs. The basic UWS 185 actions are built in. When constructing those, you can pass in as many 186 additional JobAction subclasses as you want. Set their names to 187 one of UWS standard actions to override UWS behaviour if you think 188 that's wise. 189 """ 190 _standardActions = {} 191
192 - def __init__(self, *additionalActions):
193 self.actions = {} 194 self.actions.update(self._standardActions) 195 for actionClass in additionalActions: 196 self.actions[actionClass.name] = actionClass()
197 198 @classmethod
199 - def addStandardAction(cls, actionClass):
200 cls._standardActions[actionClass.name] = actionClass()
201
202 - def dispatch(self, action, job, request, segments):
203 if job.owner: 204 if request.getUser()!=job.owner: 205 raise svcs.Authenticate() 206 207 try: 208 resFactory = self.actions[action] 209 except KeyError: 210 raise base.ui.logOldExc( 211 svcs.UnknownURI("Invalid UWS action '%s'"%action)) 212 request.setHeader("content-type", resFactory.mime) 213 return resFactory.getResource(job, request, segments)
214
215 216 -class JobAction(object):
217 """an action done to a job. 218 219 It defines methods do<METHOD> that are dispatched through JobActions. 220 221 It must have a name corresponding to the child resource names from 222 the UWS spec. 223 """ 224 name = None 225 mime = "text/xml" 226
227 - def getResource(self, job, request, segments):
228 if segments: 229 raise svcs.UnknownURI("Too many segments") 230 try: 231 handler = getattr(self, "do"+request.method) 232 except AttributeError: 233 raise base.ui.logOldExc(svcs.BadMethod(request.method)) 234 return handler(job, request)
235
236 237 -class ErrorResource(rend.Page):
238 """A TAP error message. 239 240 These are constructed with errInfo, which is either an exception or 241 a dictionary containing at least type, msg, and hint keys. Optionally, 242 you can give a numeric httpStatus. 243 """
244 - def __init__(self, errInfo, httpStatus=400):
245 if isinstance(errInfo, Exception): 246 errInfo = { 247 "msg": unicode(errInfo), 248 "type": errInfo.__class__.__name__, 249 "hint": getattr(errInfo, "hint", None)} 250 if errInfo["type"]=="JobNotFound": 251 httpStatus = 404 252 self.errMsg, self.httpStatus = errInfo["msg"], httpStatus 253 self.hint = errInfo["hint"]
254
255 - def renderHTTP(self, ctx):
256 request = inevow.IRequest(ctx) 257 request.setHeader("content-type", "text/xml") 258 request.setResponseCode(self.httpStatus) 259 doc = V.VOTABLE[ 260 V.RESOURCE(type="results") [ 261 V.INFO(name="QUERY_STATUS", value="ERROR")[ 262 self.errMsg]]] 263 if self.hint: 264 doc[V.INFO(name="HINT", value="HINT")[ 265 self.hint]] 266 return doc.render()
267
268 269 -class ErrorAction(JobAction):
270 name = "error" 271 mime = "text/plain" 272
273 - def doGET(self, job, request):
274 if job.error is None: 275 return "" 276 return ErrorResource(job.error, httpStatus=200)
277 278 doPOST = doGET
279 JobActions.addStandardAction(ErrorAction)
280 281 282 -class StartTimeAction(JobAction):
283 # This an extension over plain UWS allowing users to retrieve when 284 # their job started. In the DaCHS' TAP implementation, this lets 285 # you discern whether the taprunner is already processing an EXECUTING 286 # job (startTime!=NULL) or whether it's still coming up (else) 287 name = "startTime" 288 mime = "text/plain" 289
290 - def doGET(self, job, request):
291 if job.startTime is None: 292 return "NULL" 293 else: 294 return utils.formatISODT(job.startTime)
295 296 doPOST = doGET
297 JobActions.addStandardAction(StartTimeAction)
298 299 300 -class ParameterAction(JobAction):
301 name = "parameters" 302
303 - def doGET(self, job, request):
304 request.setHeader("content-type", "text/xml") 305 return UWS.makeRoot(getParametersElement(job))
306
307 - def doPOST(self, job, request):
308 if job.phase!=uws.PENDING: 309 raise base.ValidationError( 310 "Parameters cannot be changed in phase %s"%job.phase, "phase") 311 job.setParamsFromRequest(request) 312 raise svcs.WebRedirect(job.jobId)
313 314 JobActions.addStandardAction(ParameterAction)
315 316 -class PhaseAction(JobAction):
317 name = "phase" 318 mime = "text/plain" 319 timeout = 10 # this is here for testing 320
321 - def doPOST(self, job, request):
322 newPhase = utils.getfirst(request.args, "phase", None) 323 if newPhase=="RUN": 324 job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout) 325 elif newPhase=="ABORT": 326 job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout) 327 else: 328 raise base.ValidationError("Bad phase: %s"%newPhase, "phase") 329 raise svcs.WebRedirect(job.jobId)
330
331 - def doGET(self, job, request):
332 request.setHeader("content-type", "text/plain") 333 return job.phase
334 JobActions.addStandardAction(PhaseAction)
335 336 337 -class _SettableAction(JobAction):
338 """Abstract base for ExecDAction and DestructionAction. 339 """ 340 mime = "text/plain" 341
342 - def doPOST(self, job, request):
343 raw = utils.getfirst(request.args, self.name.lower(), None) 344 if raw is None: # with no parameter, fall back to GET 345 return self.doGET(job, request) 346 try: 347 val = self.deserializeValue(raw) 348 except ValueError: 349 raise base.ui.logOldExc(uws.UWSError("Invalid %s value: %s."%( 350 self.name.upper(), repr(raw)), job.jobId)) 351 with job.getWritable() as wjob: 352 args = {self.attName: val} 353 wjob.change(**args) 354 raise svcs.WebRedirect(job.jobId)
355
356 - def doGET(self, job, request):
357 request.setHeader("content-type", "text/plain") 358 return self.serializeValue(getattr(job, self.attName))
359
360 361 -class ExecDAction(_SettableAction):
362 name = "executionduration" 363 attName = 'executionDuration' 364 serializeValue = str 365 deserializeValue = float
366 JobActions.addStandardAction(ExecDAction)
367 368 369 -class DestructionAction(_SettableAction):
370 name = "destruction" 371 attName = "destructionTime" 372 serializeValue = staticmethod(utils.formatISODT) 373 deserializeValue = staticmethod(utils.parseISODT)
374 JobActions.addStandardAction(DestructionAction)
375 376 377 -class QuoteAction(JobAction):
378 name = "quote" 379 mime = "text/plain" 380
381 - def doGET(self, job, request):
382 request.setHeader("content-type", "text/plain") 383 if job.quote is None: 384 quote = "" 385 else: 386 quote = utils.formatISODT(job.quote) 387 return quote
388 389 JobActions.addStandardAction(QuoteAction)
390 391 392 -class OwnerAction(JobAction):
393 name = "owner" 394 mime = "text/plain" 395
396 - def doGET(self, job, request):
397 request.setHeader("content-type", "text/plain") 398 if job.owner is None: 399 request.write("NULL") 400 else: 401 request.write(job.owner) 402 return ""
403 404 JobActions.addStandardAction(OwnerAction)
405 406 407 -def _getResultsElement(job):
408 baseURL = job.getURL()+"/results/" 409 return UWS.results[[ 410 UWS.result(id=res["resultName"], href=baseURL+res["resultName"]) 411 for res in job.getResults()]]
412
413 414 -class ResultsAction(JobAction):
415 """Access result (Extension: and other) files in job directory. 416 """ 417 name = "results" 418
419 - def getResource(self, job, request, segments):
420 if not segments: 421 return JobAction.getResource(self, job, request, segments) 422 423 # first try a "real" UWS result from the job 424 if len(segments)==1: 425 try: 426 fName, resultType = job.getResult(segments[0]) 427 res = static.File(fName) 428 res.type = str(resultType) 429 res.encoding = None 430 return res 431 except base.NotFoundError: # segments[0] does not name a result 432 pass # fall through to other files 433 434 # if that doesn't work, try to return some other file from the 435 # job directory. This is so we can deliver uploads. 436 filePath = os.path.join(job.getWD(), *segments) 437 if not os.path.abspath(filePath).startswith( 438 os.path.abspath(job.getWD())): 439 raise svcs.ForbiddenURI("Not serving files outside of job directory.") 440 441 if not os.path.exists(filePath): 442 raise svcs.UnknownURI("File not found") 443 return static.File(filePath, defaultType="application/octet-stream")
444
445 - def doGET(self, job, request):
446 return _getResultsElement(job)
447 448 JobActions.addStandardAction(ResultsAction)
449 450 451 -def _serializeTime(element, dt):
452 if dt is None: 453 return element() 454 return element[utils.formatISODT(dt)]
455
456 457 -class RootAction(JobAction):
458 """Actions for async/jobId. 459 """ 460 name = "" 461 462 @utils.memoized
463 - def getJobInputTD(self):
464 """returns an InputTable to parse the arguments of the UWS1.1 polling. 465 """ 466 return base.parseFromString(svcs.InputTD, 467 """ 468 <inputTable id="jobresource_args"> 469 <inputKey name="PHASE" type="text" multiplicity="single" 470 description="Return immediately unless job is in this phase."> 471 <values> 472 <!-- we reject polling against PENDING, too, since it doesn't make 473 much sense --> 474 <option>QUEUED</option> 475 <option>EXECUTING</option> 476 </values> 477 </inputKey> 478 <inputKey name="WAIT" type="integer" multiplicity="single" 479 description="Seconds to wait with an answer if no change occurred."/> 480 </inputTable>""")
481
482 - def doDELETE(self, job, request):
483 """Implements DELETE on a job resource. 484 485 This is the UWS-compliant way to delete a job. 486 """ 487 job.uws.destroy(job.jobId) 488 raise svcs.WebRedirect("")
489
490 - def doPOST(self, wjob, request):
491 """Implements POST on a job resource. 492 493 This is a DaCHS extension to UWS in order to let web browsers 494 delete jobs by passing action=DELETE. 495 """ 496 if utils.getfirst(request.args, "action", None)=="DELETE": 497 self.doDELETE(wjob, request) 498 else: 499 raise svcs.BadMethod("POST")
500
501 - def _delayIfWAIT(self, job, request):
502 """delays if request has UWS 1.1 "slow poll" arguments, returns None 503 otherwise. 504 505 This is a helper for doGET. 506 """ 507 # This is the implemenation of UWS 1.1 "slow polling". 508 # We still do polling internally rather than use postgres' 509 # LISTEN/NOTIFY since the overhead seems rather moderate and 510 # the added complexity of setting up notifcations appeared not 511 # proportional to saving it. 512 args = svcs.CoreArgs.fromRawArgs( 513 self.getJobInputTD(), 514 request.args).args 515 516 if args["WAIT"] is None: 517 return 518 if args["WAIT"]==-1: 519 args["WAIT"] = base.getConfig("async", "maxslowpollwait") 520 521 if args["PHASE"] is not None and args["PHASE"]!=job.phase: 522 return 523 524 if job.phase not in ["QUEUED", "EXECUTING"]: 525 return 526 527 d = defer.Deferred().addCallback( 528 self._recheck, job.uws, job.jobId, args["WAIT"]-1, job.phase) 529 reactor.callLater(1, d.callback, request) 530 return d
531
532 - def _recheck(self, 533 request, workerSystem, jobId, remainingWait, originalPhase):
534 """the callback for doing slow polls. 535 """ 536 job = workerSystem.getJob(jobId) 537 if originalPhase!=job.phase or remainingWait<=0: 538 request.args = {} 539 return self.doGET(job, request) 540 541 d = defer.Deferred().addCallback( 542 self._recheck, workerSystem, jobId, remainingWait-1, originalPhase) 543 reactor.callLater(1, d.callback, request) 544 return d
545 546
547 - def doGET(self, job, request):
548 """Implements GET on a job resource: return the current job metadata. 549 """ 550 delay = self._delayIfWAIT(job, request) 551 if delay: 552 return delay 553 554 tree = UWS.makeRoot(UWS.job[ 555 UWS.jobId[job.jobId], 556 UWS.runId[job.runId], 557 UWS.ownerId[job.owner], 558 UWS.phase[job.phase], 559 UWS.quote[utils.formatISODT(job.quote)], 560 UWS.creationTime[utils.formatISODT(job.creationTime)], 561 _serializeTime(UWS.startTime, job.startTime), 562 _serializeTime(UWS.endTime, job.endTime), 563 UWS.executionDuration[str(job.executionDuration)], 564 UWS.destruction[utils.formatISODT(job.destructionTime)], 565 getParametersElement(job), 566 _getResultsElement(job), 567 getErrorSummary(job)]) 568 return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
569 570 571 JobActions.addStandardAction(RootAction)
572 573 574 -def doJobAction(workerSystem, request, segments):
575 """handles the async UI of UWS. 576 577 Depending on method and segments, it will return various XML strings 578 and may cause certain actions. 579 580 Segments must be a tuple with at least one element, the job id. 581 """ 582 jobId, segments = segments[0], segments[1:] 583 if not segments: 584 action = "" 585 else: 586 action, segments = segments[0], segments[1:] 587 return workerSystem.jobActions.dispatch(action, 588 workerSystem.getJob(jobId), request, segments)
589