Package gavo :: Package protocols :: Module uws
[frames] | no frames]

Source Code for Module gavo.protocols.uws

   1  """ 
   2  Support classes for the universal worker service. 
   3  """ 
   4   
   5  #c Copyright 2008-2019, the GAVO project 
   6  #c 
   7  #c This program is free software, covered by the GNU GPL.  See the 
   8  #c COPYING file in the source distribution. 
   9   
  10   
  11  import cPickle as pickle 
  12  import contextlib 
  13  import datetime 
  14  import os 
  15  import shutil 
  16  import signal 
  17  import tempfile 
  18  import threading 
  19  import weakref 
  20   
  21  from twisted.internet import protocol 
  22  from twisted.internet import reactor 
  23   
  24  from gavo import base 
  25  from gavo import rsc 
  26  from gavo import rscdef 
  27  from gavo import svcs 
  28  from gavo import utils 
  29  from gavo.base import cron 
  30  from gavo.protocols import dali 
  31   
  32  # Ward against typos 
  33  from gavo.votable.tapquery import ( #noflake: exported names 
  34          PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, UNKNOWN) 
  35   
  36  END_STATES = set([COMPLETED, ERROR, ABORTED]) 
  37   
  38  # used in the computation of quote 
  39  EST_TIME_PER_JOB = datetime.timedelta(minutes=10) 
  40   
  41  _DEFAULT_LOCK_TIMEOUT = 0.1 
  42   
  43   
  44  __docformat__ = "restructuredtext en" 
45 46 47 -def strOrEmpty(aVal):
48 """returns a stringified version of aVal, except an empty string is returned 49 when aVal is None. 50 """ 51 if aVal is None: 52 return "" 53 else: 54 return str(aVal)
55
56 57 -class UWSError(base.Error):
58 """UWS-related errors, mainly to communicate with web renderers. 59 60 UWSErrors are constructed with a displayable message (may be None to 61 autogenerate one), a jobId (give one; the default None is only there 62 to avoid breaking legacy code) and optionally a source exception and a hint. 63 """
64 - def __init__(self, msg, jobId=None, sourceEx=None, hint=None):
65 base.Error.__init__(self, msg, hint=hint) 66 self.msg = msg 67 self.jobId = jobId 68 self.sourceEx = sourceEx 69 self.args = [self.msg, self.jobId, self.sourceEx, self.hint]
70
71 - def __str__(self):
72 if self.msg: 73 return self.msg 74 elif self.sourceEx: 75 return "UWS on job %s operation failed (%s, %s)"%( 76 self.jobId, 77 self.sourceEx.__class__.__name__, 78 str(self.sourceEx)) 79 else: 80 return "Unspecified UWS related error (id: %s)"%self.jobId
81
82 83 -class JobNotFound(base.NotFoundError, UWSError):
84 - def __init__(self, jobId):
85 base.NotFoundError.__init__(self, str(jobId), "UWS job", "jobs table")
86
87 - def __str__(self):
88 return base.NotFoundError.__str__(self)
89
90 91 -class UWS(object):
92 """a facade for a universal worker service (UWS). 93 94 You must construct it with the job class (see UWSJob) and a 95 uwsactions.JobActions instance 96 97 The UWS then provides methods to access the jobs table, 98 create jobs and and deserialize jobs from the jobs table. 99 100 We have a "statements cache" in here, where we used the UWS table 101 defintion to create query strings we can later pass to the database. 102 Don't worry about this any more. Just write text queries when adding 103 features. It's more readable and just about as stable against 104 code evolution. 105 106 You must override the getURLForId(jobId) method in your concrete 107 implementation. 108 109 You should also override jobdocPreamble and joblistPreamble. This 110 is raw XML that is prepended to job and list documents. This is primarily 111 for PIs giving stylesheets, but given we don't use doctypes you could 112 provide internal subsets there, too. Anyway, see the TAP UWS runner 113 for examples. 114 """ 115 # how often should we check for jobs that wait for destruction? 116 cleanupInterval = 3600*12 117 118 # raw XML to prepend to joblist documents 119 joblistPreamble = "" 120 # raw XML to prepend to job documents 121 jobdocPreamble = "" 122
123 - def __init__(self, jobClass, jobActions):
124 self.jobClass = jobClass 125 self.jobActions = jobActions 126 self._statementsCache = None 127 cron.runEvery(-self.cleanupInterval, 128 "UWS %s jobs table reaper"%str(self), 129 self.cleanupJobsTable)
130
131 - def _makeMoreStatements(self, statements, jobsTable):
132 """adds custom statements to the canned query dict in derived 133 classes. 134 """ 135 pass
136
137 - def _makeStatementsCache(self):
138 """returns a dictionary containing canned queries to manipulate 139 the jobs table for this UWS. 140 """ 141 res = {} 142 td = self.jobClass.jobsTD 143 with base.getTableConn() as conn: 144 jobsTable = rsc.TableForDef(td, connection=conn, exclusive=True) 145 res["getByIdEx"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0}) 146 res["feedToIdEx"] = None, jobsTable.addCommand, None 147 res["deleteByIdEx"] = None, jobsTable.getDeleteQuery( 148 "jobId=%(jobId)s")[0], None 149 150 jobsTable = rsc.TableForDef(td, connection=conn) 151 res["getById"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0}) 152 res["getAllIds"] = jobsTable.getQuery( 153 [td.getColumnByName("jobId")], "") 154 155 countField = base.makeStruct( 156 svcs.OutputField, name="count", type="integer", select="count(*)", 157 required=True) 158 res["countRunning"] = jobsTable.getQuery([countField], 159 "phase='EXECUTING'") 160 res["countQueued"] = jobsTable.getQuery([countField], 161 "phase='QUEUED'") 162 163 self._makeMoreStatements(res, jobsTable) 164 return res
165 166 @property
167 - def _statements(self):
168 """returns a dictionary of canned statements manipulating the jobs 169 table. 170 """ 171 if self._statementsCache is None: 172 self._statementsCache = self._makeStatementsCache() 173 return self._statementsCache
174
175 - def runCanned(self, statementId, args, conn):
176 """runs the canned statement statementId with args through the 177 DB connection conn. 178 179 This will return row dictionaries of the result if there is a result. 180 """ 181 resultTableDef, statement, _ = self._statements[statementId] 182 cursor = conn.cursor() 183 184 try: 185 cursor.execute(statement, args) 186 except base.QueryCanceledError: 187 conn.rollback() 188 raise base.ReportableError("Could not access the jobs table." 189 " This probably means there is a stale lock on it. Please" 190 " notify the service operators.") 191 192 res = None 193 if resultTableDef: 194 res = [resultTableDef.makeRowFromTuple(row) 195 for row in cursor] 196 cursor.close() 197 return res
198
199 - def _serializeProps(self, props, writableConn):
200 """inserts (or overwrites) the job properties props through 201 wriableConn. 202 """ 203 self.runCanned("feedToIdEx", props, writableConn)
204
205 - def getNewJobId(self, **kws):
206 """creates a new job and returns its id. 207 208 kws can be properties of the new job or the special key timeout 209 giving after how many seconds we should give up trying to lock 210 the db. 211 """ 212 timeout = kws.pop("timeout", _DEFAULT_LOCK_TIMEOUT) 213 214 try: 215 with base.getWritableTableConn() as conn: 216 # We fire off a quick pointless query to catch server restarts; 217 # if this fails, the connection pools are cleared and the next 218 # queries will run again. 219 conn.execute("SELECT table_name FROM TAP_SCHEMA.tables LIMIT 1") 220 except base.DBError: 221 pass 222 223 with base.getWritableTableConn() as conn: 224 with base.connectionConfiguration(conn, timeout=timeout): 225 props = self.jobClass.getDefaults(conn) 226 props["jobId"] = self.jobClass.getNewId(self, conn) 227 job = self.jobClass(props, self, writable=True) 228 job.change(**kws) 229 self._serializeProps(job.getProperties(), conn) 230 return job.jobId
231
232 - def getNewIdFromRequest(self, request, service):
233 """returns the id of a new TAP job created from request. 234 235 Request has to be a nevow request or similar, with request arguments in 236 request.args. 237 238 This calls setParamsFromRequest(wjob, request) to do the actual 239 work; this latter method is what individual UWSes should override. 240 """ 241 jobId = self.getNewJobId() 242 with self.changeableJob(jobId) as wjob: 243 wjob.setParamsFromRequest(request) 244 if request.getUser(): 245 wjob.change(owner=request.getUser()) 246 return jobId
247
248 - def _getJob(self, jobId, conn, writable=False):
249 """helps getJob and getNewJob. 250 """ 251 # Caution: this code is copied in useruws.UserUWS._getJob 252 # If you find you need to change this, this may be a good 253 # time to figure out how to refactor this method. 254 statementId = 'getById' 255 if writable: 256 statementId = 'getByIdEx' 257 res = self.runCanned(statementId, {"jobId": jobId}, conn) 258 if len(res)!=1: 259 raise JobNotFound(jobId) 260 return self.jobClass(res[0], self, writable)
261
262 - def getJob(self, jobId):
263 """returns a read-only UWSJob for jobId. 264 265 Note that by the time you do something with the information here, 266 the "true" state in the database may already be different. There 267 should be no way to write whatever information you have in here, 268 so any "racing" here shouldn't hurt. 269 """ 270 with base.getTableConn() as conn: 271 return self._getJob(jobId, conn)
272
273 - def getNewJob(self, **kws):
274 """creates a new job and returns a read-only instance for it. 275 """ 276 newId = self.getNewJobId(**kws) 277 return self.getJob(newId)
278 279 @contextlib.contextmanager
280 - def changeableJob(self, jobId, timeout=_DEFAULT_LOCK_TIMEOUT):
281 """a context manager for job manipulation. 282 283 This is done such that any changes to the job's properties 284 within the controlled section get propagated to the database. 285 As long as you are in the controlled section, nobody else 286 can change the job. 287 """ 288 with base.getWritableTableConn() as conn: 289 with base.connectionConfiguration(conn, timeout=timeout): 290 job = self._getJob(jobId, conn, writable=True) 291 try: 292 yield job 293 except: 294 conn.rollback() 295 raise 296 else: 297 self._serializeProps(job.getProperties(), conn) 298 conn.commit()
299
300 - def changeToPhase(self, jobId, newPhase, input=None, 301 timeout=_DEFAULT_LOCK_TIMEOUT):
302 with self.changeableJob(jobId, timeout=timeout) as wjob: 303 try: 304 transition = wjob.getTransitionTo(newPhase) 305 return transition(newPhase, wjob, input) 306 except Exception as exception: 307 # transition to error if possible. If that fails at well, 308 # blindly set error and give up. 309 try: 310 if newPhase!=ERROR: 311 return wjob.getTransitionTo(ERROR)(ERROR, wjob, exception) 312 except: 313 wjob.change(phase=ERROR, error=exception) 314 raise
315
316 - def destroy(self, jobId):
317 """removes the job with jobId from the UWS. 318 319 This calls the job's prepareForDestruction method while the job is writable. 320 """ 321 try: 322 try: 323 with self.changeableJob(jobId) as job: 324 job.prepareForDestruction() 325 except Exception as exc: 326 base.ui.notifyWarning( 327 "Ignored error while destroying UWS job %s: %s"%(jobId, exc)) 328 finally: 329 with base.getWritableTableConn() as conn: 330 self.runCanned("deleteByIdEx", locals(), conn)
331
332 - def _countUsingCanned(self, statementId):
333 """helps the count* methods. 334 """ 335 with base.getTableConn() as conn: 336 return self.runCanned(statementId, {}, conn)[0]["count"]
337
338 - def countRunningJobs(self):
339 """returns the number of EXECUTING jobs in the jobsTable. 340 """ 341 return self._countUsingCanned('countRunning')
342
343 - def countQueuedJobs(self):
344 """returns the number of QUEUED jobs in jobsTable. 345 """ 346 return self._countUsingCanned('countQueued')
347
348 - def getJobIds(self):
349 """returns a list of all currently existing job ids. 350 """ 351 with base.getTableConn() as conn: 352 return [r["jobId"] for r in self.runCanned('getAllIds', {}, conn)]
353
354 - def getIdsAndPhases(self, owner=None, phase=None, last=None, after=None, 355 initFragments=None, initPars=None):
356 """returns pairs for id and phase for all jobs in the UWS. 357 358 phase, last, after are the respective parameters from UWS 1.1. 359 """ 360 pars = locals() 361 fragments = initFragments or [] 362 pars.update(initPars or {}) 363 limits = None 364 365 if owner is not None: 366 fragments.append("owner=%(owner)s") 367 368 if phase is not None: 369 fragments.append("phase=%(phase)s") 370 371 if last is not None: 372 limits = "ORDER BY creationTime DESC LIMIT %(limit)s" 373 pars['limit'] = last 374 375 if after is not None: 376 fragments.append("creationTime>%(after)s") 377 378 td = self.jobClass.jobsTD 379 380 with base.getTableConn() as conn: 381 return conn.query(td.getSimpleQuery(["jobId", "phase"], 382 fragments=base.joinOperatorExpr("AND", fragments), 383 postfix=limits), pars)
384
385 - def cleanupJobsTable(self, includeFailed=False, includeCompleted=False, 386 includeAll=False, includeForgotten=False):
387 """removes expired jobs from the UWS jobs table. 388 389 The constructor arranged for this to be called now and then 390 (cleanupFrequency class attribute, defaulting to 12*3600). 391 392 The functionality is also exposed through gavo admin cleanuws; this 393 also lets you use the includeFailed and includeCompleted flags. These 394 should not be used on production services since you'd probably nuke 395 jobs still interesting to your users. 396 """ 397 398 with base.AdhocQuerier() as q: 399 if not q.getTableType(self.jobClass.jobsTD.getQName()): 400 # no jobs table, nothing to clean up 401 return 402 403 phasesToKill = set() 404 if includeFailed or includeAll: 405 phasesToKill.add(ERROR) 406 phasesToKill.add(ABORTED) 407 if includeCompleted or includeAll: 408 phasesToKill.add(COMPLETED) 409 if includeAll: 410 phasesToKill.add(PENDING) 411 phasesToKill.add(QUEUED) 412 if includeForgotten: 413 phasesToKill.add(PENDING) 414 415 fragments = "destructionTime<%(now)s" 416 if phasesToKill: 417 fragments = "destructionTime<%(now)s or phase in %(ptk)s" 418 419 for row in self.jobClass.jobsTD.doSimpleQuery( 420 ['jobId'], 421 fragments, 422 {"now": datetime.datetime.utcnow(), "ptk": phasesToKill}): 423 jobId = row["jobId"] 424 try: 425 self.destroy(jobId) 426 except base.QueryCanceledError: # job locked by something, don't hang 427 base.ui.notifyWarning("Postponing destruction of %s: Locked"% 428 jobId) 429 except JobNotFound: 430 # Someone else has cleaned up -- that's ok 431 pass
432
433 - def getURLForId(self, jobId):
434 """returns the handling URL for the job with jobId. 435 436 You must override this in deriving classes. 437 """ 438 raise NotImplementedError("Incomplete UWS (getURLForId not overridden).")
439
440 441 -class UWSWithQueueing(UWS):
442 """A UWS with support for queuing. 443 444 Queuing is done on UWS level rather than at transitions. With a plain 445 UWS, if something is put on the queue, it must be started by the 446 Transition's queueJob method. 447 448 With UWSWithQueuing, you just mark the job queued and the rest is 449 taken care of by the UWS itself. 450 """ 451 # _processQueueDirty is managed through scheduleProcessQueueCheck 452 _processQueueDirty = False 453 # How many jobs will the UWS (try to) run at the same time? 454 runcountGoal = 1 455
456 - def __init__(self, jobClass, actions):
457 # processQueue shouldn't strictly need a lock. The lock mainly 458 # protects against running more unqueuers than necessary 459 self._processQueueLock = threading.Lock() 460 UWS.__init__(self, jobClass, actions)
461
462 - def _makeMoreStatements(self, statements, jobsTable):
463 UWS._makeMoreStatements(self, statements, jobsTable) 464 td = jobsTable.tableDef 465 466 countField = base.makeStruct( 467 svcs.OutputField, name="count", type="integer", select="count(*)", 468 required=True) 469 470 statements["countQueuedBefore"] = jobsTable.getQuery( 471 [countField], 472 "phase='QUEUED' and destructionTime<=%(dt)s", 473 {"dt": None}) 474 475 statements["getIdsScheduledNext"] = jobsTable.getQuery( 476 [jobsTable.tableDef.getColumnByName("jobId")], 477 "phase='QUEUED'", 478 limits=('ORDER BY destructionTime ASC', {})) 479 480 statements["getHungCandidates"] = jobsTable.getQuery([ 481 td.getColumnByName("jobId"), 482 td.getColumnByName("pid")], 483 "phase='EXECUTING'")
484
485 - def scheduleProcessQueueCheck(self):
486 """tells TAP UWS to try and dequeue jobs next time checkProcessQueue 487 is called. 488 489 This function exists since during the TAPTransistions there's 490 a writable job and processing the queue might deadlock. So, rather 491 than processing right away, we just note something may need to be 492 done. 493 """ 494 self._processQueueDirty = True
495
496 - def checkProcessQueue(self):
497 """sees if any QUEUED process can be made EXECUTING. 498 499 This must be called while you're not holding any changeableJob. 500 """ 501 if self._processQueueDirty: 502 self._processQueueDirty = False 503 self._processQueue()
504
505 - def _processQueue(self):
506 """tries to take jobs from the queue. 507 508 This function is called from checkProcessQueue when we think 509 from EXECUTING so somewhere else. 510 511 Currently, the jobs with the earliest destructionTime are processed 512 first. That's, of course, completely ad-hoc. 513 """ 514 if not self._processQueueLock.acquire(False): 515 # There's already an unqueuer running, don't need a second one 516 # Note that other processes (e.g., taprunner) might still be manipulating 517 # the jobs table, so don't rely on the tables not changing here. 518 return 519 else: 520 try: 521 if self.countQueuedJobs()==0: 522 return 523 524 try: 525 started = 0 526 with base.getTableConn() as conn: 527 toStart = [row["jobId"] for row in 528 self.runCanned('getIdsScheduledNext', {}, conn)] 529 530 while toStart: 531 if self.countRunningJobs()>=self.runcountGoal: 532 break 533 self.changeToPhase(toStart.pop(0), EXECUTING) 534 started += 1 535 536 if started==0: 537 # No jobs could be started. This may be fine when long-runnning 538 # jobs block job submission, but for catastrophic slave 539 # failures we want to make sure all jobs we think are executing 540 # actually are. If they've silently died, we log that and 541 # push them to error. 542 # We only want to do that if we're the server -- any other 543 # process couldn't see the pids anyway. 544 if base.IS_DACHS_SERVER: 545 self._ensureJobsAreRunning() 546 except Exception: 547 base.ui.notifyError("Error during queue processing, " 548 " the UWS %s is probably botched now."%self.__class__.__name__) 549 finally: 550 self._processQueueLock.release()
551
552 - def _ensureJobsAreRunning(self):
553 """pushes all executing slave jobs that silently died to ERROR. 554 """ 555 with base.getTableConn() as conn: 556 for row in self.runCanned("getHungCandidates", {}, conn): 557 jobId, pid = row["jobId"], row["pid"] 558 559 if pid is None: 560 self.changeToPhase(jobId, "ERROR", 561 UWSError("EXECUTING job %s had no pid."%jobId, jobId)) 562 base.ui.notifyError("Stillborn async slave %s"%jobId) 563 elif pid<2: 564 # these are jobs run within the server -- we can't do 565 # anything about them using process manipulation (if at all, 566 # they may be realised as threads). Skip them here. 567 continue 568 else: 569 pass
570 # We should be checking if the process is still running. Alas, 571 # there's serious syncing issues here that need to be investigated. 572 # Let's rely on the slaves cleaning up behind themselves. 573 # try: 574 # os.waitpid(pid, os.WNOHANG) 575 # except os.error, ex: # child presumably is dead 576 # # the following doesn't hurt if the job has gone to COMPLETED 577 # # in the meantime -- we don't transition *from* COMPLETED. 578 # self.changeToPhase(jobId, "ERROR", 579 # uws.UWSError("EXECUTING job %s has silently died."%jobId, jobId)) 580 # base.ui.notifyError("Zombie taprunner: %s"%jobId) 581
582 - def changeToPhase(self, jobId, newPhase, input=None, timeout=10):
583 """overridden here to hook in queue management. 584 """ 585 UWS.changeToPhase(self, jobId, newPhase, input, timeout) 586 self.checkProcessQueue()
587
588 589 -class ParameterRef(object):
590 """A UWS parameter.that is (in effect) a URL. 591 592 This always contains a URL. In case of uploads, the tap renderer makes 593 sure the upload is placed into the upload directory and generates a 594 URL; in that case, local is True. 595 596 You need this class when you want the byReference attribute in 597 the UWS.parameter element to be true. 598 """
599 - def __init__(self, url, local=False):
600 self.local = local 601 self.url = url
602
603 604 -class JobParameter(object):
605 """A UWS job parameter. 606 607 Job parameters are (normally) elements of a job's parameters dictionary, 608 i.e., usually elements of the job control language. "Magic" parameters 609 that allow automatic serialization or special effects on the job 610 can be defined using the _parameter_name class attributes of 611 UWSJobs. They are employed when using the setSerializedPar and 612 getSerializedPar interface. 613 614 All methods of these are class or static methods since 615 ProtocolParameters are never instanciated. 616 617 The default value setter enforces a maximum length of 50000 characters 618 for incoming strings. This is a mild shield against accidental DoS 619 with, e.g., bad uploads in TAP. 620 621 The serialization thing really belongs to the user-facing interface. 622 However, since it's most convenient to have these parameters 623 in the UWSJob classes, the class is defined here. 624 625 Internal clients would read the parameters directly from the dictionary. 626 627 Methods you can override: 628 629 - addPar(value, job) -> None -- parse value and perform some action 630 (typically, set an attribute) on job. The default implementation 631 puts a value into the parameters dictionary _deserialized. 632 - getPar(job) -> string -- infer a string representation of the 633 job parameter on job. The default implementation gets 634 the value from the parameter from the parameters dictionary and 635 _serializes it. 636 - _deserialize(str) -> val -- turns something from an XML tree 637 into a python value as usable by the worker 638 - _serialize(val) -> str -- yields some XML-embeddable serialization 639 of val 640 641 The default implementation just dumps/gets name from the job's 642 parameters dict. This is the behaviour for non-magic parameters 643 since (get|set)SerializedPar falls back to the base class. 644 645 CAUTION: Do *not* say job.parameters[bla] = "ab" -- your changes 646 will get lost because serialisation of the parameters dictionary must 647 be initiated manually. Always manipulate the parameters dictionary 648 by using cls.updatePar(name, value, job) or a suitable 649 facade (job.setPar, job.setSerializedPar) 650 """ 651 _deserialize, _serialize = staticmethod(strOrEmpty), staticmethod(strOrEmpty) 652 653 @classmethod
654 - def updatePar(cls, name, value, job):
655 """enters name:value into job's parameter dict. 656 657 See the uws.JobParameter's docstring. 658 """ 659 # this is a bit magic because job.parameters is only re-encoded 660 # on demand 661 parameters = job.parameters 662 if isinstance(value, basestring): 663 value = cls._deserialize(value) 664 parameters[name] = value 665 # see our docstring 666 job.change(parameters=parameters)
667 668 @classmethod
669 - def addPar(cls, name, value, job):
670 # this is a somewhat lame protection against file uploads 671 # gone haywire: ignore any values longer than 50k 672 if isinstance(value, basestring) and len(value)>50000: 673 base.ui.notifyWarning("UWS Parameter %s discarded as too long; first" 674 " bytes: %s"%(name, repr(value[:20]))) 675 cls.updatePar(name, cls._deserialize(value), job)
676 677 @classmethod
678 - def getPar(cls, name, job):
679 return cls._serialize(job.parameters.get(name))
680
681 682 -class UploadParameter(JobParameter):
683 """A generic DALI-style upload facility. 684 685 We add this to all UWS job classes when their underlying services have 686 a file-typed input key. It will contain some somewhat arbitrary string 687 that lets people guess what they uploaded. TAP does this a bit 688 differently from useruws, which tries a somewhat rationalised approach. 689 """ 690 # the implementation is messy -- as for inline uploads, two parameters 691 # are involved (UPLOAD and the file parameter) and the normal UWS parameter 692 # interface only passes the parameter to be processed, we need to steal 693 # the request from upstack. This, admittedly, is ugly, but then 694 # the UPLOAD design is botched, so I feel entitled to play it dirty 695 # rather than spoil my design. 696 @classmethod
697 - def _deserialize(cls, value):
698 if value is None: 699 return [] 700 return value.split("/")
701 702 @classmethod
703 - def _serialize(cls, value):
704 if value is None: 705 return "" 706 return "/".join(value)
707 708 @classmethod
709 - def addPar(cls, name, value, job):
710 if not value.strip(): 711 return 712 713 for newFName in dali.writeUploadBytesTo( 714 utils.stealVar("request"), os.path.join(job.getWD())): 715 job.setPar(newFName, newFName)
716
717 718 -class FileParameter(JobParameter):
719 """an uploaded file. 720 721 These are being created by posting to upload in the current design; 722 hence, we fail on an attempt to addPar those. The serialisation 723 yields ParameterRefs. 724 725 Note that TAP uploads currently employ a different scheme since TAP 726 uploads don't match what DALI says. 727 728 The stored values are always URLs into our service, regardless of where 729 the upload came from. For simplicity, we store the things in results. 730 731 TODO: We should preserve the media type of the upload where available. 732 """ 733 @classmethod
734 - def _serialize(cls, value):
735 if value is None: 736 return "" 737 return ParameterRef(value)
738 739 @classmethod
740 - def updatePar(cls, name, value, job):
741 # value is the file name (that happens to be the name of the input key; 742 # in DALI terms, it's what's in front of the comma. 743 JobParameter.updatePar(name, job.getURL()+"/results/"+value, job)
744 745 @classmethod
746 - def addPar(self, name, value, job):
747 raise base.ValidationError("File parameters cannot be set by posting to" 748 " them. Use DALI-style UPDATEs for them.", name)
749
750 751 -class UWSJobType(type):
752 """The metaclass for UWS jobs. 753 754 We have the metaclass primarily because we want to delay loading 755 the actual definition until it is actually needed (otherwise we 756 might get interesting chicken-egg-problems with rscdesc at some point). 757 758 A welcome side effect is that we can do custom constructors and 759 similar cosmetic deviltry. 760 """ 761 @property
762 - def jobsTD(cls):
763 try: 764 return cls._jobsTDCache 765 except AttributeError: 766 cls._jobsTDCache = base.resolveCrossId(cls._jobsTDId, rscdef.TableDef) 767 return cls._jobsTDCache
768
769 770 -class BaseUWSJob(object):
771 """An abstract UWS job. 772 773 UWS jobs are always instanciated with a row from the associated 774 jobs table (i.e. a dictionary giving all the uws properties). You 775 can read the properties as attributes. UWSJobs also keep 776 a (weak) reference to the UWS that made them. 777 778 To alter uws properties, use the change method. This will fail unless 779 the job was created giving writable=True. 780 781 To make it concrete, you need to define: 782 783 - a _jobsTDid attribute giving the (cross) id of the UWS jobs 784 table for this kind of job 785 - a _transitions attribute giving a UWSTransitions instance that defines 786 what to do on transistions 787 - as needed, class methods _default_<parName> if you need to default 788 job parameters in newly created jobs 789 - as needed, methods _decode_<parName> and _encode_<parName> 790 to bring uws parameters (i.e., everything that has a DB column) 791 from and to the DB representation from *python* values. 792 793 You may want to override: 794 795 - a class method getNewId(uws, writableConn) -> str, a method 796 allocating a unique id for a new job and returning it. Beware 797 of races here; if your allocation is through the database table, 798 you'll have to lock it *and* write a preliminary record with your new 799 id. The default implementation does this, but if you want 800 something in the file system, you probably don't want to 801 use that. 802 - a method _setParamsFromDict(argDict), which takes a nevow-style 803 request.args dictionary and sets the job parameters. This is 804 only necessary if you need extra mappings between names and such. 805 806 For every piece of the job parameters, define 807 class attributes _parameters_<parname.lower()> with JobParameter 808 values saying how they are serialized and deserialized. Only 809 parameters defined in this way are accepted and integrated 810 into the parameters dict. 811 812 If you need to clean up before the job is torn down, redefine 813 the prepareForDestruction method. 814 """ 815 # Why no properties? Well, I could do them from the metaclass, and 816 # that'd suck since I'd have to parse the resource descriptor on 817 # module import, and I don't want that. So, it'd be major trickery 818 # and I don't think it's worth that kind of effort. 819 # 820 # Why the odd hoops with serialization and deserialization? 821 # Well, what's in the database is "directly" defined by the protocol. 822 # Thus, the protocol methods (e.g., speaking HTTP) will know about the 823 # external representations, and UWSJobs just return python values for them. 824 # 825 # For job parameters, the serialization is something about the job control 826 # language. Thus, the serialization is better done by the job class 827 # rather than the code implementing the underlying protocol. Hence 828 # the JobParameters magic. See also the TAP job, where that kind of 829 # thing is actually used. 830 831 __metaclass__ = UWSJobType 832
833 - def __init__(self, props, uws, writable=False):
834 object.__setattr__(self, "_props", props) 835 self.writable = writable 836 self.uws = weakref.proxy(uws)
837
838 - def __getattr__(self, name):
839 if name in self._props: 840 return getattr(self, "_decode_"+name, utils.identity)( 841 self._props[name]) 842 raise AttributeError("%s objects have no attribute '%s'"%( 843 self.__class__.__name__, name))
844
845 - def __setattr__(self, name, value):
846 # ward against tempting bugs, disallow assigning to names in _props: 847 if name in self._props: 848 raise TypeError("Use the change method to change the %s" 849 " attribute."%name) 850 object.__setattr__(self, name, value)
851 852 @property
853 - def quote(self):
854 """Always returns None. 855 856 Override if you have a queue management. 857 """ 858 return None
859 860 @classmethod
861 - def getNewId(cls, uws, conn):
862 cursor = conn.cursor() 863 tableName = cls.jobsTD.getQName() 864 cursor.execute("LOCK TABLE %s IN ACCESS SHARE MODE"%tableName) 865 try: 866 while True: 867 newId = utils.getRandomString(10) 868 cursor.execute("SELECT * FROM %s WHERE jobId=%%(jobId)s"%tableName, 869 {"jobId": newId}) 870 if not list(cursor): 871 cursor.execute( 872 "INSERT INTO %s (jobId) VALUES (%%(jobId)s)"%tableName, 873 {"jobId": newId}) 874 break 875 cursor.close() 876 conn.commit() 877 except: 878 conn.rollback() 879 raise 880 return newId
881 882 @classmethod
883 - def getDefaults(cls, conn):
884 """returns a dictionary suitable for inserting into a jobsTD table. 885 """ 886 res = {} 887 for column in cls.jobsTD: 888 name = column.name 889 res[name] = getattr(cls, "_default_"+name, lambda: None)() 890 return res
891 892 @classmethod
893 - def _default_phase(cls):
894 return PENDING
895 896 @classmethod
898 return base.getConfig("async", "defaultExecTime")
899 900 @classmethod
901 - def _default_creationTime(cls):
902 return datetime.datetime.utcnow()
903 904 @classmethod
905 - def _default_destructionTime(cls):
906 return datetime.datetime.utcnow()+datetime.timedelta( 907 seconds=base.getConfig("async", "defaultLifetime"))
908
909 - def _encode_error(self, value):
910 """returns a pickled dictionary with error information. 911 912 value can either be an exception object or a dictionary as 913 generated here. 914 """ 915 if value is None: 916 return None 917 if not isinstance(value, dict): 918 value = { 919 "type": value.__class__.__name__, 920 "msg": unicode(value), 921 "hint": getattr(value, "hint", None), 922 } 923 return pickle.dumps(value)
924
925 - def _decode_error(self, value):
926 """returns the unpickled three-item dictionary from the database string. 927 """ 928 if value is None: 929 return None 930 return pickle.loads(str(value))
931 932 @classmethod
933 - def _default_parameters(cls):
934 return pickle.dumps({}, protocol=2).encode("zlib").encode("base64")
935
936 - def _encode_parameters(self, value):
937 """(db format for parameters is a pickle) 938 """ 939 return pickle.dumps(value, protocol=2).encode("zlib").encode("base64")
940
941 - def _decode_parameters(self, value):
942 """(db format for parameters is a pickle) 943 """ 944 return pickle.loads(str(value).decode("base64").decode("zlib"))
945
946 - def _getParameterDef(self, parName):
947 """returns the job/uws parameter definition for parName and the name 948 the parameter will actually be stored as. 949 950 All these parameters are forced to be lower case (and thus 951 case-insensitive). The actual storage name of the parameter is 952 still returned in case saner conventions may be forthcoming. 953 """ 954 parName = parName.lower() 955 name = "_parameter_"+parName 956 if hasattr(self, name): 957 return getattr(self, name), parName 958 return JobParameter, parName
959
960 - def setSerializedPar(self, parName, parValue):
961 """enters parName:parValue into self.parameters after deserializing it. 962 963 This is when input comes from text; use setPar for values already 964 parsed. 965 """ 966 parDef, name = self._getParameterDef(parName) 967 parDef.addPar(name, parValue, self)
968
969 - def setPar(self, parName, parValue):
970 """enters parName:parValue into self.parameters. 971 """ 972 parDef, name = self._getParameterDef(parName) 973 parDef.updatePar(name, parValue, self)
974
975 - def getSerializedPar(self, parName):
976 """returns self.parameters[parName] in text form. 977 978 This is for use from a text-based interface. Workers read from 979 parameters directly. 980 """ 981 parDef, name = self._getParameterDef(parName) 982 return parDef.getPar(name, self)
983
984 - def iterSerializedPars(self):
985 """iterates over the serialized versions of the parameters. 986 """ 987 for key in self.iterParameterNames(): 988 yield key, self.getSerializedPar(key)
989
990 - def iterParameterNames(self):
991 """iterates over the names of the parameters declared for the job. 992 """ 993 for n in dir(self): 994 if n.startswith("_parameter_"): 995 yield n[11:]
996
997 - def _setParamsFromDict(self, argDict):
998 """sets our parameters from a dictionary of string lists. 999 1000 self must be writable for this to work. 1001 """ 1002 for key in self.iterParameterNames(): 1003 if key in argDict: 1004 val = argDict[key] 1005 # TODO: handling multiple arguments must be way better thought out. 1006 if isinstance(val, list): 1007 val = " ".join(val) 1008 if not val: 1009 # have some way to re-set a parameter? Anyway, I need to 1010 # ignore empty parameters or my XSLT form will break 1011 continue 1012 self.setSerializedPar(key, val)
1013
1014 - def setParamsFromRequest(self, request):
1015 """sets our parameter dict from a nevow request. 1016 1017 This can be called on both writable and non-writable jobs. 1018 """ 1019 with self.getWritable() as wjob: 1020 wjob._setParamsFromDict(request.args)
1021
1022 - def getTransitionTo(self, newPhase):
1023 """returns the action prescribed to push self to newPhase. 1024 1025 A ValidationError is raised if no such transition is defined. 1026 """ 1027 return self._transitions.getTransition(self.phase, newPhase)
1028
1029 - def change(self, **kwargs):
1030 """changes the property values to what's given by the keyword arguments. 1031 1032 It is an AttributeError to try and change a property that is not defined. 1033 """ 1034 if not self.writable: 1035 raise TypeError("Read-only UWS job (You can only change UWSJobs" 1036 "obtained through changeableJob.") 1037 for propName, propValue in kwargs.iteritems(): 1038 if propName not in self._props: 1039 raise AttributeError("%ss have no attribute %s"%( 1040 self.__class__.__name__, propName)) 1041 self._props[propName] = getattr(self, "_encode_"+propName, 1042 utils.identity)(propValue)
1043
1044 - def getProperties(self):
1045 """returns the properties of the job as they are stored in the 1046 database. 1047 1048 Use attribute access to read them and change to change them. Do 1049 *not* get values from the dictionary you get and do *not* change 1050 the dictionary. 1051 """ 1052 return self._props
1053
1054 - def update(self):
1055 """fetches a new copy of the job props from the DB. 1056 1057 You should in general not need this, since UWSJob objects are intended 1058 to be short-lived ("for the duration of an async request"). Still, 1059 for testing and similar, it's convenient to be able to update 1060 a UWS job from the database. 1061 """ 1062 self._props = self.uws.getJob(self.jobId)._props
1063
1064 - def prepareForDestruction(self):
1065 """is called before the job's database row is torn down. 1066 1067 Self is writable at this point. 1068 """
1069
1070 - def getURL(self):
1071 """returns the UWS URL for this job. 1072 """ 1073 return self.uws.getURLForId(self.jobId)
1074 1075 @contextlib.contextmanager
1076 - def getWritable(self):
1077 """a context manager for a writeable version of the job. 1078 1079 Changes will be written back at the end, and the job object itself 1080 will be updated from the database. 1081 1082 If self already is writable, it is returned unchanged, and changes 1083 are only persisted when the enclosing controlling block finishes. 1084 """ 1085 if self.writable: 1086 yield self 1087 return 1088 1089 with self.uws.changeableJob(self.jobId) as wjob: 1090 yield wjob 1091 self.update()
1092
1093 1094 -class UWSJobWithWD(BaseUWSJob):
1095 """A UWS job with a working directory. 1096 1097 This generates ids from directory names in a directory (the 1098 uwsWD) shared for all UWSes on the system. 1099 1100 It also adds methods 1101 1102 - getWD() -> str returning the working directory 1103 - addResult(self, source, mimeType, name=None) to add a new 1104 result 1105 - openResult(self, mimeType, name) -> file to get an open file in the 1106 WD to write to in order to generate a result 1107 - getResult(self, resName) -> str to get the *path* of a result with 1108 resName 1109 - getResults(self) -> list-of-dicts to get dicts describing all 1110 results available 1111 - openFile(self) -> file to get a file letting you read an existing 1112 result. 1113 """ 1114 @classmethod
1115 - def getNewId(self, uws, conn):
1116 # our id is the base name of the jobs's temporary directory. 1117 uwsWD = base.getConfig("uwsWD") 1118 utils.ensureDir(uwsWD, mode=0775, setGroupTo=base.getGroupId()) 1119 jobDir = tempfile.mkdtemp("", "", dir=uwsWD) 1120 return os.path.basename(jobDir)
1121
1122 - def getWD(self):
1123 return os.path.join(base.getConfig("uwsWD"), self.jobId)
1124
1125 - def prepareForDestruction(self):
1126 shutil.rmtree(self.getWD())
1127 1128 # results management: We use a pickled list in the jobs dir to manage 1129 # the results. I once had a table of those in the DB and it just 1130 # wasn't worth it. One issue, though: this potentially races 1131 # if two different processes/threads were to update the results 1132 # at the same time. This could be worked around by writing 1133 # the results pickle only from within changeableJobs. 1134 # 1135 # The list contains dictionaries having resultName and resultType keys. 1136 @property
1137 - def _resultsDirName(self):
1138 return os.path.join(self.getWD(), "__RESULTS__")
1139
1140 - def _loadResults(self):
1141 try: 1142 with open(self._resultsDirName) as f: 1143 return pickle.load(f) 1144 except IOError: 1145 return []
1146
1147 - def _saveResults(self, results):
1148 handle, srcName = tempfile.mkstemp(dir=self.getWD()) 1149 with os.fdopen(handle, "w") as f: 1150 pickle.dump(results, f) 1151 # The following operation will bomb on windows when the second 1152 # result is saved. Tough luck. 1153 os.rename(srcName, self._resultsDirName)
1154
1155 - def _addResultInJobDir(self, mimeType, name):
1156 resTable = self._loadResults() 1157 newRec = {'resultName': name, 'resultType': mimeType} 1158 1159 for index, res in enumerate(resTable): 1160 if res["resultName"]==name: 1161 resTable[index] = newRec 1162 break 1163 else: 1164 resTable.append( 1165 {'resultName': name, 'resultType': mimeType}) 1166 1167 self._saveResults(resTable)
1168
1169 - def fixTypeForResultName(self, resultName, mediaType):
1170 """sets the media type for result resultName. 1171 1172 It is not an error if no result with resultName exists. 1173 """ 1174 resTable = self._loadResults() 1175 for row in resTable: 1176 if row["resultName"]==resultName: 1177 row["resultType"] = mediaType 1178 self._saveResults(resTable)
1179
1180 - def addResult(self, source, mimeType, name=None):
1181 """adds a result, with data taken from source. 1182 1183 source may be a file-like object or a byte string. 1184 1185 If no name is passed, a name is auto-generated. 1186 """ 1187 if name is None: 1188 name = utils.intToFunnyName(id(source)) 1189 with open(os.path.join(self.getWD(), name), "w") as destF: 1190 if isinstance(source, basestring): 1191 destF.write(source) 1192 else: 1193 utils.cat(source, destF) 1194 self._addResultInJobDir(mimeType, name)
1195
1196 - def openResult(self, mimeType, name):
1197 """returns a writable file that adds a result. 1198 """ 1199 self._addResultInJobDir(mimeType, name) 1200 return open(os.path.join(self.getWD(), name), "w")
1201
1202 - def getResult(self, resName):
1203 """returns a pair of file name and mime type for a named job result. 1204 1205 If the result does not exist, a NotFoundError is raised. 1206 """ 1207 res = [r for r in self._loadResults() if resName==r["resultName"]] 1208 if not res: 1209 raise base.NotFoundError(resName, "job result", 1210 "uws job %s"%self.jobId) 1211 res = res[0] 1212 return os.path.join(self.getWD(), res["resultName"]), res["resultType"]
1213
1214 - def getResults(self):
1215 """returns a list of this service's results. 1216 1217 The list contains dictionaries having at least resultName and resultType 1218 keys. 1219 """ 1220 return self._loadResults()
1221
1222 - def openFile(self, name, mode="r"):
1223 """returns an open file object for a file within the job's work directory. 1224 1225 No path parts are allowed on name. 1226 """ 1227 if "/" in name: 1228 raise ValueError("No path components allowed on job files.") 1229 return open(os.path.join(self.getWD(), name), mode)
1230
1231 1232 -class UWSTransitions(object):
1233 """An abstract base for classes defining the behaviour of a UWS. 1234 1235 This basically is the definition of a finite state machine with 1236 arbitrary input (which is to say: the input "alphabet" is up to 1237 the transitions). 1238 1239 A UWSTransitions instance is in the transitions attribute of a job 1240 class. 1241 1242 The main interface to UWSTransitions is getTransition(p1, p2) -> callable 1243 It returns a callable that should push the automaton from phase p1 1244 to phase p2 or raise an ValidationError for a field phase. 1245 1246 The callable has the signature f(desiredPhase, wjob, input) -> None. 1247 It must alter the uwsJob object as appropriate. input is some object 1248 defined by the the transition. The job passed is a changeable job, 1249 so the handlers actually hold locks to the job row. Thus, be brief. 1250 1251 The transitions are implemented as simple methods having the signature 1252 of the callables returned by getTransition. 1253 1254 To link transistions and methods, pass a vertices list to the constructor. 1255 This list consists of 3-tuples of strings (from, to, method-name). From and 1256 to are phase names (use the symbols from this module to ward against typos). 1257 """
1258 - def __init__(self, name, vertices):
1259 self.name = name 1260 self._buildTransitions(vertices)
1261
1262 - def _buildTransitions(self, vertices):
1263 self.transitions = {} 1264 # set some defaults 1265 for phase in [PENDING, QUEUED, EXECUTING, ERROR, ABORTED, COMPLETED]: 1266 self.transitions.setdefault(phase, {})[ERROR] = "flagError" 1267 self.transitions.setdefault(EXECUTING, {})[COMPLETED 1268 ] = "noteEndTime" 1269 1270 for fromPhase, toPhase, methodName in vertices: 1271 self.transitions.setdefault(fromPhase, {})[toPhase] = methodName
1272
1273 - def getTransition(self, fromPhase, toPhase):
1274 if (fromPhase==toPhase or 1275 fromPhase in END_STATES): 1276 # ignore null or ignorable transitions 1277 return lambda p, job, input: None 1278 try: 1279 methodName = self.transitions[fromPhase][toPhase] 1280 except KeyError: 1281 raise base.ui.logOldExc( 1282 base.ValidationError("No transition from %s to %s defined" 1283 " for %s jobs"%(fromPhase, toPhase, self.name), 1284 "phase", hint="This almost always points to an implementation error")) 1285 try: 1286 return getattr(self, methodName) 1287 except AttributeError: 1288 raise base.ui.logOldExc( 1289 base.ValidationError("%s Transitions have no %s methods"%(self.name, 1290 methodName), 1291 "phase", hint="This is an error in an internal protocol definition." 1292 " There probably is nothing you can do but complain."))
1293
1294 - def noOp(self, newPhase, job, ignored):
1295 """a sample action just setting the new phase. 1296 1297 This is a no-op baseline sometimes useful in user code. 1298 """ 1299 job.change(phase=newPhase)
1300
1301 - def flagError(self, newPhase, wjob, exception):
1302 """the default action when transitioning to an error: dump exception and 1303 mark phase as ERROR.. 1304 """ 1305 wjob.change(phase=ERROR) 1306 # Validation errors don't get logged -- for one, they probably 1307 # are the user's fault, and for a second, logging them upsets 1308 # trial during testing, since trial examines the log. 1309 if not isinstance(exception, base.ValidationError): 1310 base.ui.notifyError("Error during UWS execution of job %s"%wjob.jobId) 1311 wjob.change(error=exception) 1312 if wjob.endTime is None: 1313 wjob.change(endTime=datetime.datetime.utcnow())
1314
1315 - def noteEndTime(self, newPhase, wjob, ignored):
1316 wjob.change(endTime=datetime.datetime.utcnow())
1317
1318 1319 -class SimpleUWSTransitions(UWSTransitions):
1320 """A UWSTransitions with sensible transitions pre-defined. 1321 1322 See the source for what we consider sensible. 1323 1324 The idea here is that you simply override (and usually up-call) 1325 the methods queueJob, markAborted, startJob, completeJob, 1326 killJob, errorOutJob, and ignoreAndLog. 1327 1328 You will have to define startJob and provide some way to execute 1329 startJob on QUEUED jobs (there's nothing wrong with immediately 1330 calling self.startJob(...) if you don't mind the DoS danger). 1331 1332 Once you have startJob, you'll probably want to define killJob as 1333 well. 1334 """
1335 - def __init__(self, name):
1336 UWSTransitions.__init__(self, name, [ 1337 (PENDING, QUEUED, "queueJob"), 1338 (PENDING, ABORTED, "markAborted"), 1339 (QUEUED, ABORTED, "markAborted"), 1340 (QUEUED, EXECUTING, "startJob"), 1341 (EXECUTING, COMPLETED, "completeJob"), 1342 (EXECUTING, ABORTED, "killJob"), 1343 (EXECUTING, ERROR, "errorOutJob"), 1344 (COMPLETED, ERROR, "ignoreAndLog"), 1345 ])
1346
1347 - def queueJob(self, newState, wjob, ignored):
1348 """puts a job on the queue. 1349 """ 1350 wjob.change(phase=QUEUED)
1351
1352 - def markAborted(self, newState, wjob, ignored):
1353 """simply marks job as aborted. 1354 1355 This is what happens if you abort a job from QUEUED or 1356 PENDING. 1357 """ 1358 wjob.change(phase=ABORTED, 1359 endTime=datetime.datetime.utcnow())
1360
1361 - def ignoreAndLog(self, newState, wjob, exc):
1362 """logs an attempt to transition when it's impossible but 1363 shouldn't result in an error. 1364 1365 This is mainly so COMPLETED things don't fail just because of some 1366 mishap. 1367 """ 1368 base.ui.logErrorOccurred("Request to push %s job to ERROR: %s"%( 1369 wjob.phase, str(exc)))
1370
1371 - def errorOutJob(self, newPhase, wjob, ignored):
1372 """pushes a job to an error state. 1373 1374 This is called by a worker; leaving the error message itself 1375 is part of the worker's duty. 1376 """ 1377 wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow()) 1378 self.flagError(newPhase, wjob, ignored)
1379
1380 - def killJob(self, newPhase, wjob, ignored):
1381 """should abort a job. 1382 1383 There's really not much we can do here, so this is a no-op. 1384 1385 Do not up-call here, you'll get a (then spurious) warning 1386 if you do. 1387 """ 1388 base.ui.notifyWarning("%s UWSes cannot kill jobs"%self.name)
1389
1390 - def completeJob(self, newPhase, wjob, ignored):
1391 """pushes a job into the completed state. 1392 """ 1393 wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())
1394
1395 1396 -def _replaceFDs(inFName, outFName):
1397 # This is used for clean forking and doesn't actually belong here. 1398 # utils.ostricks should take this. 1399 """closes all (findable) file descriptors and replaces stdin with inF 1400 and stdout/err with outF. 1401 """ 1402 for fd in range(255, -1, -1): 1403 try: 1404 os.close(fd) 1405 except os.error: 1406 pass 1407 inF, outF = open(inFName), open(outFName, "w") 1408 os.dup(inF.fileno()) 1409 os.dup(outF.fileno()) 1410 os.dup(outF.fileno())
1411
1412 1413 -class _UWSBackendProtocol(protocol.ProcessProtocol):
1414 """The protocol used for taprunners when spawning them under a twisted 1415 reactor. 1416 """
1417 - def __init__(self, jobId, workerSystem):
1418 self.jobId = jobId 1419 self.workerSystem = workerSystem
1420
1421 - def outReceived(self, data):
1422 base.ui.notifyInfo("TAP worker %s produced output: %s"%( 1423 self.jobId, data))
1424
1425 - def errReceived(self, data):
1426 base.ui.notifyInfo("TAP worker %s produced an error message: %s"%( 1427 self.jobId, data))
1428
1429 - def processEnded(self, statusObject):
1430 """tries to ensure the job is in an admitted end state. 1431 """ 1432 try: 1433 job = self.workerSystem.getJob(self.jobId) 1434 if job.phase==QUEUED or job.phase==EXECUTING: 1435 try: 1436 raise UWSError("Job hung in %s"%job.phase, job.jobId) 1437 except UWSError as ex: 1438 self.workerSystem.changeToPhase(self.jobId, ERROR, ex) 1439 except JobNotFound: # job already deleted 1440 pass
1441
1442 1443 -class ProcessBasedUWSTransitions(SimpleUWSTransitions):
1444 """A SimpleUWSTransistions that processes its stuff in a child process. 1445 1446 Inheriting classes must implement the getCommandLine(wjob) method -- 1447 it must return a command (suitable for reactor.spawnProcess and 1448 os.execlp and a list of arguments suitable for reactor.spawnProcess. 1449 1450 They must also implement some sort of queue management. The the simplest 1451 case, override queueJob and start the job from there (but set 1452 to QUEUED in there anyway). 1453 """
1454 - def getCommandLine(self, wjob):
1455 raise NotImplementedError("%s transitions do not define how" 1456 " to get a command line"%self.__class__.__name__)
1457
1458 - def _startJobTwisted(self, wjob):
1459 """starts a job by forking a new process when we're running 1460 within a twisted reactor. 1461 """ 1462 assert wjob.phase==QUEUED 1463 cmd, args = self.getCommandLine(wjob) 1464 pt = reactor.spawnProcess(_UWSBackendProtocol(wjob.jobId, wjob.uws), 1465 cmd, args=args, 1466 env=os.environ) 1467 wjob.change(pid=pt.pid, phase=EXECUTING)
1468
1469 - def _startJobNonTwisted(self, wjob):
1470 """forks off a new process when (hopefully) a manual child reaper 1471 is in place. 1472 """ 1473 cmd, args = self.getCommandLine(wjob) 1474 pid = os.fork() 1475 if pid==0: 1476 _replaceFDs("/dev/zero", "/dev/null") 1477 os.execlp(cmd, *args) 1478 elif pid>0: 1479 wjob.change(pid=pid, phase=EXECUTING) 1480 else: 1481 raise Exception("Could not fork")
1482
1483 - def startJob(self, newState, wjob, ignored):
1484 """causes a process to be started that executes job. 1485 1486 This dispatches according to whether or not we are within a twisted 1487 event loop, mostly for testing support. 1488 """ 1489 if reactor.running: 1490 return self._startJobTwisted(wjob) 1491 else: 1492 return self._startJobNonTwisted(wjob)
1493
1494 - def killJob(self, newState, wjob, ignored):
1495 """tries to kill/abort job. 1496 1497 Actually, there are two different scenarios here: Either the job has 1498 a non-NULL startTime. In that case, the child job is in control 1499 and will manage the state itself. Then kill -INT will do the right 1500 thing. 1501 1502 However, if startTime is NULL, the child is still starting up. Sending 1503 a kill -INT may do many things, and most of them we don't want. 1504 So, in this case we kill -TERM the child, do state management ourselves 1505 and hope for the best. 1506 """ 1507 try: 1508 pid = wjob.pid 1509 if pid is None: 1510 raise UWSError("Job is not running") 1511 elif pid<2: 1512 raise UWSError("Job has unkillalbe PID") 1513 1514 if wjob.startTime is None: 1515 # the child job is not up yet, kill it brutally and manage 1516 # state ourselves 1517 os.kill(pid, signal.SIGTERM) 1518 self.markAborted(ABORTED, wjob, ignored) 1519 else: 1520 # child job is up, can manage state itself 1521 os.kill(pid, signal.SIGINT) 1522 except UWSError: 1523 raise 1524 except Exception as ex: 1525 raise UWSError(None, ex)
1526