1 """
2 Support classes for the universal worker service.
3 """
4
5
6
7
8
9
10
11 import cPickle as pickle
12 import contextlib
13 import datetime
14 import os
15 import shutil
16 import signal
17 import tempfile
18 import threading
19 import weakref
20
21 from twisted.internet import protocol
22 from twisted.internet import reactor
23
24 from gavo import base
25 from gavo import rsc
26 from gavo import rscdef
27 from gavo import svcs
28 from gavo import utils
29 from gavo.base import cron
30 from gavo.protocols import dali
31
32
33 from gavo.votable.tapquery import (
34 PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, UNKNOWN)
35
36 END_STATES = set([COMPLETED, ERROR, ABORTED])
37
38
39 EST_TIME_PER_JOB = datetime.timedelta(minutes=10)
40
41 _DEFAULT_LOCK_TIMEOUT = 0.1
42
43
44 __docformat__ = "restructuredtext en"
48 """returns a stringified version of aVal, except an empty string is returned
49 when aVal is None.
50 """
51 if aVal is None:
52 return ""
53 else:
54 return str(aVal)
55
58 """UWS-related errors, mainly to communicate with web renderers.
59
60 UWSErrors are constructed with a displayable message (may be None to
61 autogenerate one), a jobId (give one; the default None is only there
62 to avoid breaking legacy code) and optionally a source exception and a hint.
63 """
64 - def __init__(self, msg, jobId=None, sourceEx=None, hint=None):
65 base.Error.__init__(self, msg, hint=hint)
66 self.msg = msg
67 self.jobId = jobId
68 self.sourceEx = sourceEx
69 self.args = [self.msg, self.jobId, self.sourceEx, self.hint]
70
72 if self.msg:
73 return self.msg
74 elif self.sourceEx:
75 return "UWS on job %s operation failed (%s, %s)"%(
76 self.jobId,
77 self.sourceEx.__class__.__name__,
78 str(self.sourceEx))
79 else:
80 return "Unspecified UWS related error (id: %s)"%self.jobId
81
89
90
91 -class UWS(object):
92 """a facade for a universal worker service (UWS).
93
94 You must construct it with the job class (see UWSJob) and a
95 uwsactions.JobActions instance
96
97 The UWS then provides methods to access the jobs table,
98 create jobs and and deserialize jobs from the jobs table.
99
100 We have a "statements cache" in here, where we used the UWS table
101 defintion to create query strings we can later pass to the database.
102 Don't worry about this any more. Just write text queries when adding
103 features. It's more readable and just about as stable against
104 code evolution.
105
106 You must override the getURLForId(jobId) method in your concrete
107 implementation.
108
109 You should also override jobdocPreamble and joblistPreamble. This
110 is raw XML that is prepended to job and list documents. This is primarily
111 for PIs giving stylesheets, but given we don't use doctypes you could
112 provide internal subsets there, too. Anyway, see the TAP UWS runner
113 for examples.
114 """
115
116 cleanupInterval = 3600*12
117
118
119 joblistPreamble = ""
120
121 jobdocPreamble = ""
122
123 - def __init__(self, jobClass, jobActions):
130
132 """adds custom statements to the canned query dict in derived
133 classes.
134 """
135 pass
136
138 """returns a dictionary containing canned queries to manipulate
139 the jobs table for this UWS.
140 """
141 res = {}
142 td = self.jobClass.jobsTD
143 with base.getTableConn() as conn:
144 jobsTable = rsc.TableForDef(td, connection=conn, exclusive=True)
145 res["getByIdEx"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0})
146 res["feedToIdEx"] = None, jobsTable.addCommand, None
147 res["deleteByIdEx"] = None, jobsTable.getDeleteQuery(
148 "jobId=%(jobId)s")[0], None
149
150 jobsTable = rsc.TableForDef(td, connection=conn)
151 res["getById"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0})
152 res["getAllIds"] = jobsTable.getQuery(
153 [td.getColumnByName("jobId")], "")
154
155 countField = base.makeStruct(
156 svcs.OutputField, name="count", type="integer", select="count(*)",
157 required=True)
158 res["countRunning"] = jobsTable.getQuery([countField],
159 "phase='EXECUTING'")
160 res["countQueued"] = jobsTable.getQuery([countField],
161 "phase='QUEUED'")
162
163 self._makeMoreStatements(res, jobsTable)
164 return res
165
166 @property
168 """returns a dictionary of canned statements manipulating the jobs
169 table.
170 """
171 if self._statementsCache is None:
172 self._statementsCache = self._makeStatementsCache()
173 return self._statementsCache
174
175 - def runCanned(self, statementId, args, conn):
176 """runs the canned statement statementId with args through the
177 DB connection conn.
178
179 This will return row dictionaries of the result if there is a result.
180 """
181 resultTableDef, statement, _ = self._statements[statementId]
182 cursor = conn.cursor()
183
184 try:
185 cursor.execute(statement, args)
186 except base.QueryCanceledError:
187 conn.rollback()
188 raise base.ReportableError("Could not access the jobs table."
189 " This probably means there is a stale lock on it. Please"
190 " notify the service operators.")
191
192 res = None
193 if resultTableDef:
194 res = [resultTableDef.makeRowFromTuple(row)
195 for row in cursor]
196 cursor.close()
197 return res
198
200 """inserts (or overwrites) the job properties props through
201 wriableConn.
202 """
203 self.runCanned("feedToIdEx", props, writableConn)
204
231
233 """returns the id of a new TAP job created from request.
234
235 Request has to be a nevow request or similar, with request arguments in
236 request.args.
237
238 This calls setParamsFromRequest(wjob, request) to do the actual
239 work; this latter method is what individual UWSes should override.
240 """
241 jobId = self.getNewJobId()
242 with self.changeableJob(jobId) as wjob:
243 wjob.setParamsFromRequest(request)
244 if request.getUser():
245 wjob.change(owner=request.getUser())
246 return jobId
247
248 - def _getJob(self, jobId, conn, writable=False):
249 """helps getJob and getNewJob.
250 """
251
252
253
254 statementId = 'getById'
255 if writable:
256 statementId = 'getByIdEx'
257 res = self.runCanned(statementId, {"jobId": jobId}, conn)
258 if len(res)!=1:
259 raise JobNotFound(jobId)
260 return self.jobClass(res[0], self, writable)
261
263 """returns a read-only UWSJob for jobId.
264
265 Note that by the time you do something with the information here,
266 the "true" state in the database may already be different. There
267 should be no way to write whatever information you have in here,
268 so any "racing" here shouldn't hurt.
269 """
270 with base.getTableConn() as conn:
271 return self._getJob(jobId, conn)
272
274 """creates a new job and returns a read-only instance for it.
275 """
276 newId = self.getNewJobId(**kws)
277 return self.getJob(newId)
278
279 @contextlib.contextmanager
281 """a context manager for job manipulation.
282
283 This is done such that any changes to the job's properties
284 within the controlled section get propagated to the database.
285 As long as you are in the controlled section, nobody else
286 can change the job.
287 """
288 with base.getWritableTableConn() as conn:
289 with base.connectionConfiguration(conn, timeout=timeout):
290 job = self._getJob(jobId, conn, writable=True)
291 try:
292 yield job
293 except:
294 conn.rollback()
295 raise
296 else:
297 self._serializeProps(job.getProperties(), conn)
298 conn.commit()
299
315
331
333 """helps the count* methods.
334 """
335 with base.getTableConn() as conn:
336 return self.runCanned(statementId, {}, conn)[0]["count"]
337
339 """returns the number of EXECUTING jobs in the jobsTable.
340 """
341 return self._countUsingCanned('countRunning')
342
344 """returns the number of QUEUED jobs in jobsTable.
345 """
346 return self._countUsingCanned('countQueued')
347
349 """returns a list of all currently existing job ids.
350 """
351 with base.getTableConn() as conn:
352 return [r["jobId"] for r in self.runCanned('getAllIds', {}, conn)]
353
354 - def getIdsAndPhases(self, owner=None, phase=None, last=None, after=None,
355 initFragments=None, initPars=None):
356 """returns pairs for id and phase for all jobs in the UWS.
357
358 phase, last, after are the respective parameters from UWS 1.1.
359 """
360 pars = locals()
361 fragments = initFragments or []
362 pars.update(initPars or {})
363 limits = None
364
365 if owner is not None:
366 fragments.append("owner=%(owner)s")
367
368 if phase is not None:
369 fragments.append("phase=%(phase)s")
370
371 if last is not None:
372 limits = "ORDER BY creationTime DESC LIMIT %(limit)s"
373 pars['limit'] = last
374
375 if after is not None:
376 fragments.append("creationTime>%(after)s")
377
378 td = self.jobClass.jobsTD
379
380 with base.getTableConn() as conn:
381 return conn.query(td.getSimpleQuery(["jobId", "phase"],
382 fragments=base.joinOperatorExpr("AND", fragments),
383 postfix=limits), pars)
384
385 - def cleanupJobsTable(self, includeFailed=False, includeCompleted=False,
386 includeAll=False, includeForgotten=False):
387 """removes expired jobs from the UWS jobs table.
388
389 The constructor arranged for this to be called now and then
390 (cleanupFrequency class attribute, defaulting to 12*3600).
391
392 The functionality is also exposed through gavo admin cleanuws; this
393 also lets you use the includeFailed and includeCompleted flags. These
394 should not be used on production services since you'd probably nuke
395 jobs still interesting to your users.
396 """
397
398 with base.AdhocQuerier() as q:
399 if not q.getTableType(self.jobClass.jobsTD.getQName()):
400
401 return
402
403 phasesToKill = set()
404 if includeFailed or includeAll:
405 phasesToKill.add(ERROR)
406 phasesToKill.add(ABORTED)
407 if includeCompleted or includeAll:
408 phasesToKill.add(COMPLETED)
409 if includeAll:
410 phasesToKill.add(PENDING)
411 phasesToKill.add(QUEUED)
412 if includeForgotten:
413 phasesToKill.add(PENDING)
414
415 fragments = "destructionTime<%(now)s"
416 if phasesToKill:
417 fragments = "destructionTime<%(now)s or phase in %(ptk)s"
418
419 for row in self.jobClass.jobsTD.doSimpleQuery(
420 ['jobId'],
421 fragments,
422 {"now": datetime.datetime.utcnow(), "ptk": phasesToKill}):
423 jobId = row["jobId"]
424 try:
425 self.destroy(jobId)
426 except base.QueryCanceledError:
427 base.ui.notifyWarning("Postponing destruction of %s: Locked"%
428 jobId)
429 except JobNotFound:
430
431 pass
432
434 """returns the handling URL for the job with jobId.
435
436 You must override this in deriving classes.
437 """
438 raise NotImplementedError("Incomplete UWS (getURLForId not overridden).")
439
442 """A UWS with support for queuing.
443
444 Queuing is done on UWS level rather than at transitions. With a plain
445 UWS, if something is put on the queue, it must be started by the
446 Transition's queueJob method.
447
448 With UWSWithQueuing, you just mark the job queued and the rest is
449 taken care of by the UWS itself.
450 """
451
452 _processQueueDirty = False
453
454 runcountGoal = 1
455
457
458
459 self._processQueueLock = threading.Lock()
460 UWS.__init__(self, jobClass, actions)
461
463 UWS._makeMoreStatements(self, statements, jobsTable)
464 td = jobsTable.tableDef
465
466 countField = base.makeStruct(
467 svcs.OutputField, name="count", type="integer", select="count(*)",
468 required=True)
469
470 statements["countQueuedBefore"] = jobsTable.getQuery(
471 [countField],
472 "phase='QUEUED' and destructionTime<=%(dt)s",
473 {"dt": None})
474
475 statements["getIdsScheduledNext"] = jobsTable.getQuery(
476 [jobsTable.tableDef.getColumnByName("jobId")],
477 "phase='QUEUED'",
478 limits=('ORDER BY destructionTime ASC', {}))
479
480 statements["getHungCandidates"] = jobsTable.getQuery([
481 td.getColumnByName("jobId"),
482 td.getColumnByName("pid")],
483 "phase='EXECUTING'")
484
486 """tells TAP UWS to try and dequeue jobs next time checkProcessQueue
487 is called.
488
489 This function exists since during the TAPTransistions there's
490 a writable job and processing the queue might deadlock. So, rather
491 than processing right away, we just note something may need to be
492 done.
493 """
494 self._processQueueDirty = True
495
497 """sees if any QUEUED process can be made EXECUTING.
498
499 This must be called while you're not holding any changeableJob.
500 """
501 if self._processQueueDirty:
502 self._processQueueDirty = False
503 self._processQueue()
504
506 """tries to take jobs from the queue.
507
508 This function is called from checkProcessQueue when we think
509 from EXECUTING so somewhere else.
510
511 Currently, the jobs with the earliest destructionTime are processed
512 first. That's, of course, completely ad-hoc.
513 """
514 if not self._processQueueLock.acquire(False):
515
516
517
518 return
519 else:
520 try:
521 if self.countQueuedJobs()==0:
522 return
523
524 try:
525 started = 0
526 with base.getTableConn() as conn:
527 toStart = [row["jobId"] for row in
528 self.runCanned('getIdsScheduledNext', {}, conn)]
529
530 while toStart:
531 if self.countRunningJobs()>=self.runcountGoal:
532 break
533 self.changeToPhase(toStart.pop(0), EXECUTING)
534 started += 1
535
536 if started==0:
537
538
539
540
541
542
543
544 if base.IS_DACHS_SERVER:
545 self._ensureJobsAreRunning()
546 except Exception:
547 base.ui.notifyError("Error during queue processing, "
548 " the UWS %s is probably botched now."%self.__class__.__name__)
549 finally:
550 self._processQueueLock.release()
551
553 """pushes all executing slave jobs that silently died to ERROR.
554 """
555 with base.getTableConn() as conn:
556 for row in self.runCanned("getHungCandidates", {}, conn):
557 jobId, pid = row["jobId"], row["pid"]
558
559 if pid is None:
560 self.changeToPhase(jobId, "ERROR",
561 UWSError("EXECUTING job %s had no pid."%jobId, jobId))
562 base.ui.notifyError("Stillborn async slave %s"%jobId)
563 elif pid<2:
564
565
566
567 continue
568 else:
569 pass
570
571
572
573
574
575
576
577
578
579
580
581
582 - def changeToPhase(self, jobId, newPhase, input=None, timeout=10):
587
590 """A UWS parameter.that is (in effect) a URL.
591
592 This always contains a URL. In case of uploads, the tap renderer makes
593 sure the upload is placed into the upload directory and generates a
594 URL; in that case, local is True.
595
596 You need this class when you want the byReference attribute in
597 the UWS.parameter element to be true.
598 """
600 self.local = local
601 self.url = url
602
605 """A UWS job parameter.
606
607 Job parameters are (normally) elements of a job's parameters dictionary,
608 i.e., usually elements of the job control language. "Magic" parameters
609 that allow automatic serialization or special effects on the job
610 can be defined using the _parameter_name class attributes of
611 UWSJobs. They are employed when using the setSerializedPar and
612 getSerializedPar interface.
613
614 All methods of these are class or static methods since
615 ProtocolParameters are never instanciated.
616
617 The default value setter enforces a maximum length of 50000 characters
618 for incoming strings. This is a mild shield against accidental DoS
619 with, e.g., bad uploads in TAP.
620
621 The serialization thing really belongs to the user-facing interface.
622 However, since it's most convenient to have these parameters
623 in the UWSJob classes, the class is defined here.
624
625 Internal clients would read the parameters directly from the dictionary.
626
627 Methods you can override:
628
629 - addPar(value, job) -> None -- parse value and perform some action
630 (typically, set an attribute) on job. The default implementation
631 puts a value into the parameters dictionary _deserialized.
632 - getPar(job) -> string -- infer a string representation of the
633 job parameter on job. The default implementation gets
634 the value from the parameter from the parameters dictionary and
635 _serializes it.
636 - _deserialize(str) -> val -- turns something from an XML tree
637 into a python value as usable by the worker
638 - _serialize(val) -> str -- yields some XML-embeddable serialization
639 of val
640
641 The default implementation just dumps/gets name from the job's
642 parameters dict. This is the behaviour for non-magic parameters
643 since (get|set)SerializedPar falls back to the base class.
644
645 CAUTION: Do *not* say job.parameters[bla] = "ab" -- your changes
646 will get lost because serialisation of the parameters dictionary must
647 be initiated manually. Always manipulate the parameters dictionary
648 by using cls.updatePar(name, value, job) or a suitable
649 facade (job.setPar, job.setSerializedPar)
650 """
651 _deserialize, _serialize = staticmethod(strOrEmpty), staticmethod(strOrEmpty)
652
653 @classmethod
667
668 @classmethod
669 - def addPar(cls, name, value, job):
676
677 @classmethod
680
683 """A generic DALI-style upload facility.
684
685 We add this to all UWS job classes when their underlying services have
686 a file-typed input key. It will contain some somewhat arbitrary string
687 that lets people guess what they uploaded. TAP does this a bit
688 differently from useruws, which tries a somewhat rationalised approach.
689 """
690
691
692
693
694
695
696 @classmethod
698 if value is None:
699 return []
700 return value.split("/")
701
702 @classmethod
704 if value is None:
705 return ""
706 return "/".join(value)
707
708 @classmethod
709 - def addPar(cls, name, value, job):
716
719 """an uploaded file.
720
721 These are being created by posting to upload in the current design;
722 hence, we fail on an attempt to addPar those. The serialisation
723 yields ParameterRefs.
724
725 Note that TAP uploads currently employ a different scheme since TAP
726 uploads don't match what DALI says.
727
728 The stored values are always URLs into our service, regardless of where
729 the upload came from. For simplicity, we store the things in results.
730
731 TODO: We should preserve the media type of the upload where available.
732 """
733 @classmethod
738
739 @classmethod
744
745 @classmethod
746 - def addPar(self, name, value, job):
747 raise base.ValidationError("File parameters cannot be set by posting to"
748 " them. Use DALI-style UPDATEs for them.", name)
749
752 """The metaclass for UWS jobs.
753
754 We have the metaclass primarily because we want to delay loading
755 the actual definition until it is actually needed (otherwise we
756 might get interesting chicken-egg-problems with rscdesc at some point).
757
758 A welcome side effect is that we can do custom constructors and
759 similar cosmetic deviltry.
760 """
761 @property
768
771 """An abstract UWS job.
772
773 UWS jobs are always instanciated with a row from the associated
774 jobs table (i.e. a dictionary giving all the uws properties). You
775 can read the properties as attributes. UWSJobs also keep
776 a (weak) reference to the UWS that made them.
777
778 To alter uws properties, use the change method. This will fail unless
779 the job was created giving writable=True.
780
781 To make it concrete, you need to define:
782
783 - a _jobsTDid attribute giving the (cross) id of the UWS jobs
784 table for this kind of job
785 - a _transitions attribute giving a UWSTransitions instance that defines
786 what to do on transistions
787 - as needed, class methods _default_<parName> if you need to default
788 job parameters in newly created jobs
789 - as needed, methods _decode_<parName> and _encode_<parName>
790 to bring uws parameters (i.e., everything that has a DB column)
791 from and to the DB representation from *python* values.
792
793 You may want to override:
794
795 - a class method getNewId(uws, writableConn) -> str, a method
796 allocating a unique id for a new job and returning it. Beware
797 of races here; if your allocation is through the database table,
798 you'll have to lock it *and* write a preliminary record with your new
799 id. The default implementation does this, but if you want
800 something in the file system, you probably don't want to
801 use that.
802 - a method _setParamsFromDict(argDict), which takes a nevow-style
803 request.args dictionary and sets the job parameters. This is
804 only necessary if you need extra mappings between names and such.
805
806 For every piece of the job parameters, define
807 class attributes _parameters_<parname.lower()> with JobParameter
808 values saying how they are serialized and deserialized. Only
809 parameters defined in this way are accepted and integrated
810 into the parameters dict.
811
812 If you need to clean up before the job is torn down, redefine
813 the prepareForDestruction method.
814 """
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831 __metaclass__ = UWSJobType
832
833 - def __init__(self, props, uws, writable=False):
834 object.__setattr__(self, "_props", props)
835 self.writable = writable
836 self.uws = weakref.proxy(uws)
837
844
851
852 @property
854 """Always returns None.
855
856 Override if you have a queue management.
857 """
858 return None
859
860 @classmethod
881
882 @classmethod
884 """returns a dictionary suitable for inserting into a jobsTD table.
885 """
886 res = {}
887 for column in cls.jobsTD:
888 name = column.name
889 res[name] = getattr(cls, "_default_"+name, lambda: None)()
890 return res
891
892 @classmethod
895
896 @classmethod
898 return base.getConfig("async", "defaultExecTime")
899
900 @classmethod
902 return datetime.datetime.utcnow()
903
904 @classmethod
906 return datetime.datetime.utcnow()+datetime.timedelta(
907 seconds=base.getConfig("async", "defaultLifetime"))
908
910 """returns a pickled dictionary with error information.
911
912 value can either be an exception object or a dictionary as
913 generated here.
914 """
915 if value is None:
916 return None
917 if not isinstance(value, dict):
918 value = {
919 "type": value.__class__.__name__,
920 "msg": unicode(value),
921 "hint": getattr(value, "hint", None),
922 }
923 return pickle.dumps(value)
924
926 """returns the unpickled three-item dictionary from the database string.
927 """
928 if value is None:
929 return None
930 return pickle.loads(str(value))
931
932 @classmethod
935
937 """(db format for parameters is a pickle)
938 """
939 return pickle.dumps(value, protocol=2).encode("zlib").encode("base64")
940
942 """(db format for parameters is a pickle)
943 """
944 return pickle.loads(str(value).decode("base64").decode("zlib"))
945
947 """returns the job/uws parameter definition for parName and the name
948 the parameter will actually be stored as.
949
950 All these parameters are forced to be lower case (and thus
951 case-insensitive). The actual storage name of the parameter is
952 still returned in case saner conventions may be forthcoming.
953 """
954 parName = parName.lower()
955 name = "_parameter_"+parName
956 if hasattr(self, name):
957 return getattr(self, name), parName
958 return JobParameter, parName
959
961 """enters parName:parValue into self.parameters after deserializing it.
962
963 This is when input comes from text; use setPar for values already
964 parsed.
965 """
966 parDef, name = self._getParameterDef(parName)
967 parDef.addPar(name, parValue, self)
968
969 - def setPar(self, parName, parValue):
970 """enters parName:parValue into self.parameters.
971 """
972 parDef, name = self._getParameterDef(parName)
973 parDef.updatePar(name, parValue, self)
974
976 """returns self.parameters[parName] in text form.
977
978 This is for use from a text-based interface. Workers read from
979 parameters directly.
980 """
981 parDef, name = self._getParameterDef(parName)
982 return parDef.getPar(name, self)
983
989
991 """iterates over the names of the parameters declared for the job.
992 """
993 for n in dir(self):
994 if n.startswith("_parameter_"):
995 yield n[11:]
996
998 """sets our parameters from a dictionary of string lists.
999
1000 self must be writable for this to work.
1001 """
1002 for key in self.iterParameterNames():
1003 if key in argDict:
1004 val = argDict[key]
1005
1006 if isinstance(val, list):
1007 val = " ".join(val)
1008 if not val:
1009
1010
1011 continue
1012 self.setSerializedPar(key, val)
1013
1015 """sets our parameter dict from a nevow request.
1016
1017 This can be called on both writable and non-writable jobs.
1018 """
1019 with self.getWritable() as wjob:
1020 wjob._setParamsFromDict(request.args)
1021
1023 """returns the action prescribed to push self to newPhase.
1024
1025 A ValidationError is raised if no such transition is defined.
1026 """
1027 return self._transitions.getTransition(self.phase, newPhase)
1028
1030 """changes the property values to what's given by the keyword arguments.
1031
1032 It is an AttributeError to try and change a property that is not defined.
1033 """
1034 if not self.writable:
1035 raise TypeError("Read-only UWS job (You can only change UWSJobs"
1036 "obtained through changeableJob.")
1037 for propName, propValue in kwargs.iteritems():
1038 if propName not in self._props:
1039 raise AttributeError("%ss have no attribute %s"%(
1040 self.__class__.__name__, propName))
1041 self._props[propName] = getattr(self, "_encode_"+propName,
1042 utils.identity)(propValue)
1043
1045 """returns the properties of the job as they are stored in the
1046 database.
1047
1048 Use attribute access to read them and change to change them. Do
1049 *not* get values from the dictionary you get and do *not* change
1050 the dictionary.
1051 """
1052 return self._props
1053
1055 """fetches a new copy of the job props from the DB.
1056
1057 You should in general not need this, since UWSJob objects are intended
1058 to be short-lived ("for the duration of an async request"). Still,
1059 for testing and similar, it's convenient to be able to update
1060 a UWS job from the database.
1061 """
1062 self._props = self.uws.getJob(self.jobId)._props
1063
1065 """is called before the job's database row is torn down.
1066
1067 Self is writable at this point.
1068 """
1069
1071 """returns the UWS URL for this job.
1072 """
1073 return self.uws.getURLForId(self.jobId)
1074
1075 @contextlib.contextmanager
1077 """a context manager for a writeable version of the job.
1078
1079 Changes will be written back at the end, and the job object itself
1080 will be updated from the database.
1081
1082 If self already is writable, it is returned unchanged, and changes
1083 are only persisted when the enclosing controlling block finishes.
1084 """
1085 if self.writable:
1086 yield self
1087 return
1088
1089 with self.uws.changeableJob(self.jobId) as wjob:
1090 yield wjob
1091 self.update()
1092
1095 """A UWS job with a working directory.
1096
1097 This generates ids from directory names in a directory (the
1098 uwsWD) shared for all UWSes on the system.
1099
1100 It also adds methods
1101
1102 - getWD() -> str returning the working directory
1103 - addResult(self, source, mimeType, name=None) to add a new
1104 result
1105 - openResult(self, mimeType, name) -> file to get an open file in the
1106 WD to write to in order to generate a result
1107 - getResult(self, resName) -> str to get the *path* of a result with
1108 resName
1109 - getResults(self) -> list-of-dicts to get dicts describing all
1110 results available
1111 - openFile(self) -> file to get a file letting you read an existing
1112 result.
1113 """
1114 @classmethod
1121
1123 return os.path.join(base.getConfig("uwsWD"), self.jobId)
1124
1126 shutil.rmtree(self.getWD())
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136 @property
1138 return os.path.join(self.getWD(), "__RESULTS__")
1139
1141 try:
1142 with open(self._resultsDirName) as f:
1143 return pickle.load(f)
1144 except IOError:
1145 return []
1146
1148 handle, srcName = tempfile.mkstemp(dir=self.getWD())
1149 with os.fdopen(handle, "w") as f:
1150 pickle.dump(results, f)
1151
1152
1153 os.rename(srcName, self._resultsDirName)
1154
1156 resTable = self._loadResults()
1157 newRec = {'resultName': name, 'resultType': mimeType}
1158
1159 for index, res in enumerate(resTable):
1160 if res["resultName"]==name:
1161 resTable[index] = newRec
1162 break
1163 else:
1164 resTable.append(
1165 {'resultName': name, 'resultType': mimeType})
1166
1167 self._saveResults(resTable)
1168
1170 """sets the media type for result resultName.
1171
1172 It is not an error if no result with resultName exists.
1173 """
1174 resTable = self._loadResults()
1175 for row in resTable:
1176 if row["resultName"]==resultName:
1177 row["resultType"] = mediaType
1178 self._saveResults(resTable)
1179
1180 - def addResult(self, source, mimeType, name=None):
1181 """adds a result, with data taken from source.
1182
1183 source may be a file-like object or a byte string.
1184
1185 If no name is passed, a name is auto-generated.
1186 """
1187 if name is None:
1188 name = utils.intToFunnyName(id(source))
1189 with open(os.path.join(self.getWD(), name), "w") as destF:
1190 if isinstance(source, basestring):
1191 destF.write(source)
1192 else:
1193 utils.cat(source, destF)
1194 self._addResultInJobDir(mimeType, name)
1195
1197 """returns a writable file that adds a result.
1198 """
1199 self._addResultInJobDir(mimeType, name)
1200 return open(os.path.join(self.getWD(), name), "w")
1201
1203 """returns a pair of file name and mime type for a named job result.
1204
1205 If the result does not exist, a NotFoundError is raised.
1206 """
1207 res = [r for r in self._loadResults() if resName==r["resultName"]]
1208 if not res:
1209 raise base.NotFoundError(resName, "job result",
1210 "uws job %s"%self.jobId)
1211 res = res[0]
1212 return os.path.join(self.getWD(), res["resultName"]), res["resultType"]
1213
1215 """returns a list of this service's results.
1216
1217 The list contains dictionaries having at least resultName and resultType
1218 keys.
1219 """
1220 return self._loadResults()
1221
1223 """returns an open file object for a file within the job's work directory.
1224
1225 No path parts are allowed on name.
1226 """
1227 if "/" in name:
1228 raise ValueError("No path components allowed on job files.")
1229 return open(os.path.join(self.getWD(), name), mode)
1230
1233 """An abstract base for classes defining the behaviour of a UWS.
1234
1235 This basically is the definition of a finite state machine with
1236 arbitrary input (which is to say: the input "alphabet" is up to
1237 the transitions).
1238
1239 A UWSTransitions instance is in the transitions attribute of a job
1240 class.
1241
1242 The main interface to UWSTransitions is getTransition(p1, p2) -> callable
1243 It returns a callable that should push the automaton from phase p1
1244 to phase p2 or raise an ValidationError for a field phase.
1245
1246 The callable has the signature f(desiredPhase, wjob, input) -> None.
1247 It must alter the uwsJob object as appropriate. input is some object
1248 defined by the the transition. The job passed is a changeable job,
1249 so the handlers actually hold locks to the job row. Thus, be brief.
1250
1251 The transitions are implemented as simple methods having the signature
1252 of the callables returned by getTransition.
1253
1254 To link transistions and methods, pass a vertices list to the constructor.
1255 This list consists of 3-tuples of strings (from, to, method-name). From and
1256 to are phase names (use the symbols from this module to ward against typos).
1257 """
1259 self.name = name
1260 self._buildTransitions(vertices)
1261
1263 self.transitions = {}
1264
1265 for phase in [PENDING, QUEUED, EXECUTING, ERROR, ABORTED, COMPLETED]:
1266 self.transitions.setdefault(phase, {})[ERROR] = "flagError"
1267 self.transitions.setdefault(EXECUTING, {})[COMPLETED
1268 ] = "noteEndTime"
1269
1270 for fromPhase, toPhase, methodName in vertices:
1271 self.transitions.setdefault(fromPhase, {})[toPhase] = methodName
1272
1274 if (fromPhase==toPhase or
1275 fromPhase in END_STATES):
1276
1277 return lambda p, job, input: None
1278 try:
1279 methodName = self.transitions[fromPhase][toPhase]
1280 except KeyError:
1281 raise base.ui.logOldExc(
1282 base.ValidationError("No transition from %s to %s defined"
1283 " for %s jobs"%(fromPhase, toPhase, self.name),
1284 "phase", hint="This almost always points to an implementation error"))
1285 try:
1286 return getattr(self, methodName)
1287 except AttributeError:
1288 raise base.ui.logOldExc(
1289 base.ValidationError("%s Transitions have no %s methods"%(self.name,
1290 methodName),
1291 "phase", hint="This is an error in an internal protocol definition."
1292 " There probably is nothing you can do but complain."))
1293
1294 - def noOp(self, newPhase, job, ignored):
1295 """a sample action just setting the new phase.
1296
1297 This is a no-op baseline sometimes useful in user code.
1298 """
1299 job.change(phase=newPhase)
1300
1301 - def flagError(self, newPhase, wjob, exception):
1314
1317
1320 """A UWSTransitions with sensible transitions pre-defined.
1321
1322 See the source for what we consider sensible.
1323
1324 The idea here is that you simply override (and usually up-call)
1325 the methods queueJob, markAborted, startJob, completeJob,
1326 killJob, errorOutJob, and ignoreAndLog.
1327
1328 You will have to define startJob and provide some way to execute
1329 startJob on QUEUED jobs (there's nothing wrong with immediately
1330 calling self.startJob(...) if you don't mind the DoS danger).
1331
1332 Once you have startJob, you'll probably want to define killJob as
1333 well.
1334 """
1336 UWSTransitions.__init__(self, name, [
1337 (PENDING, QUEUED, "queueJob"),
1338 (PENDING, ABORTED, "markAborted"),
1339 (QUEUED, ABORTED, "markAborted"),
1340 (QUEUED, EXECUTING, "startJob"),
1341 (EXECUTING, COMPLETED, "completeJob"),
1342 (EXECUTING, ABORTED, "killJob"),
1343 (EXECUTING, ERROR, "errorOutJob"),
1344 (COMPLETED, ERROR, "ignoreAndLog"),
1345 ])
1346
1347 - def queueJob(self, newState, wjob, ignored):
1351
1353 """simply marks job as aborted.
1354
1355 This is what happens if you abort a job from QUEUED or
1356 PENDING.
1357 """
1358 wjob.change(phase=ABORTED,
1359 endTime=datetime.datetime.utcnow())
1360
1362 """logs an attempt to transition when it's impossible but
1363 shouldn't result in an error.
1364
1365 This is mainly so COMPLETED things don't fail just because of some
1366 mishap.
1367 """
1368 base.ui.logErrorOccurred("Request to push %s job to ERROR: %s"%(
1369 wjob.phase, str(exc)))
1370
1372 """pushes a job to an error state.
1373
1374 This is called by a worker; leaving the error message itself
1375 is part of the worker's duty.
1376 """
1377 wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())
1378 self.flagError(newPhase, wjob, ignored)
1379
1380 - def killJob(self, newPhase, wjob, ignored):
1381 """should abort a job.
1382
1383 There's really not much we can do here, so this is a no-op.
1384
1385 Do not up-call here, you'll get a (then spurious) warning
1386 if you do.
1387 """
1388 base.ui.notifyWarning("%s UWSes cannot kill jobs"%self.name)
1389
1391 """pushes a job into the completed state.
1392 """
1393 wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())
1394
1397
1398
1399 """closes all (findable) file descriptors and replaces stdin with inF
1400 and stdout/err with outF.
1401 """
1402 for fd in range(255, -1, -1):
1403 try:
1404 os.close(fd)
1405 except os.error:
1406 pass
1407 inF, outF = open(inFName), open(outFName, "w")
1408 os.dup(inF.fileno())
1409 os.dup(outF.fileno())
1410 os.dup(outF.fileno())
1411
1414 """The protocol used for taprunners when spawning them under a twisted
1415 reactor.
1416 """
1417 - def __init__(self, jobId, workerSystem):
1420
1424
1428
1441
1444 """A SimpleUWSTransistions that processes its stuff in a child process.
1445
1446 Inheriting classes must implement the getCommandLine(wjob) method --
1447 it must return a command (suitable for reactor.spawnProcess and
1448 os.execlp and a list of arguments suitable for reactor.spawnProcess.
1449
1450 They must also implement some sort of queue management. The the simplest
1451 case, override queueJob and start the job from there (but set
1452 to QUEUED in there anyway).
1453 """
1457
1459 """starts a job by forking a new process when we're running
1460 within a twisted reactor.
1461 """
1462 assert wjob.phase==QUEUED
1463 cmd, args = self.getCommandLine(wjob)
1464 pt = reactor.spawnProcess(_UWSBackendProtocol(wjob.jobId, wjob.uws),
1465 cmd, args=args,
1466 env=os.environ)
1467 wjob.change(pid=pt.pid, phase=EXECUTING)
1468
1470 """forks off a new process when (hopefully) a manual child reaper
1471 is in place.
1472 """
1473 cmd, args = self.getCommandLine(wjob)
1474 pid = os.fork()
1475 if pid==0:
1476 _replaceFDs("/dev/zero", "/dev/null")
1477 os.execlp(cmd, *args)
1478 elif pid>0:
1479 wjob.change(pid=pid, phase=EXECUTING)
1480 else:
1481 raise Exception("Could not fork")
1482
1483 - def startJob(self, newState, wjob, ignored):
1484 """causes a process to be started that executes job.
1485
1486 This dispatches according to whether or not we are within a twisted
1487 event loop, mostly for testing support.
1488 """
1489 if reactor.running:
1490 return self._startJobTwisted(wjob)
1491 else:
1492 return self._startJobNonTwisted(wjob)
1493
1494 - def killJob(self, newState, wjob, ignored):
1495 """tries to kill/abort job.
1496
1497 Actually, there are two different scenarios here: Either the job has
1498 a non-NULL startTime. In that case, the child job is in control
1499 and will manage the state itself. Then kill -INT will do the right
1500 thing.
1501
1502 However, if startTime is NULL, the child is still starting up. Sending
1503 a kill -INT may do many things, and most of them we don't want.
1504 So, in this case we kill -TERM the child, do state management ourselves
1505 and hope for the best.
1506 """
1507 try:
1508 pid = wjob.pid
1509 if pid is None:
1510 raise UWSError("Job is not running")
1511 elif pid<2:
1512 raise UWSError("Job has unkillalbe PID")
1513
1514 if wjob.startTime is None:
1515
1516
1517 os.kill(pid, signal.SIGTERM)
1518 self.markAborted(ABORTED, wjob, ignored)
1519 else:
1520
1521 os.kill(pid, signal.SIGINT)
1522 except UWSError:
1523 raise
1524 except Exception as ex:
1525 raise UWSError(None, ex)
1526