Package gavo :: Package protocols :: Module useruws
[frames] | no frames]

Source Code for Module gavo.protocols.useruws

  1  """ 
  2  Support for UWSes defined in user RDs. 
  3   
  4  To understand this, start at makeUWSForService. 
  5  """ 
  6   
  7  #c Copyright 2008-2019, the GAVO project 
  8  #c 
  9  #c This program is free software, covered by the GNU GPL.  See the 
 10  #c COPYING file in the source distribution. 
 11   
 12   
 13  import cPickle as pickle 
 14  import datetime 
 15  import weakref 
 16   
 17  from gavo import base 
 18  from gavo import formats 
 19  from gavo import rsc 
 20  from gavo import rscdesc #noflake: for registration 
 21  from gavo import svcs 
 22  from gavo.protocols import uws 
 23  from gavo.protocols import uwsactions 
 24   
 25   
26 -class UserUWSTransitions(uws.ProcessBasedUWSTransitions):
27 """The transition function for user-defined UWSes. 28 """
29 - def __init__(self):
31
32 - def queueJob(self, newState, wjob, ignored):
33 """puts a job on the queue. 34 """ 35 uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) 36 wjob.uws.scheduleProcessQueueCheck()
37
38 - def getCommandLine(self, wjob):
39 args = ["gavo", "uwsrun", "--", str(wjob.jobId)] 40 if base.DEBUG: 41 args[1:1] = ["--debug", "--traceback"] 42 return "gavo", args
43 44
45 -def makeUWSJobParameterFor(inputKey):
46 """returns a uws.JobParameter instance for inputKey. 47 """ 48 class SomeParameter(uws.JobParameter): 49 name = inputKey.name 50 _deserialize = inputKey._parse 51 _serialize = inputKey._unparse
52 return SomeParameter 53 54
55 -class UserUWSJobBase(uws.UWSJobWithWD):
56 """The base class for the service-specific user UWS jobs. 57 58 (i.e., the things that the UserUWSJobFactory spits out) 59 """ 60 _transitions = UserUWSTransitions() 61 _jobsTDId = "//uws#userjobs"
62 63
64 -def makeUserUWSJobClass(service):
65 """returns a class object for representing UWS jobs processing requests 66 for service 67 """ 68 class UserUWSJob(UserUWSJobBase): 69 pass
70 71 defaults = {} 72 for ik in service.getInputKeysFor("uws.xml"): 73 if ik.type=="file": 74 # these are handled by UPLOAD 75 setattr(UserUWSJob, "_parameter_upload", uws.UploadParameter()) 76 setattr(UserUWSJob, "_parameter_"+ik.name, uws.FileParameter()) 77 continue 78 79 setattr(UserUWSJob, "_parameter_"+ik.name, 80 makeUWSJobParameterFor(ik)) 81 defaults[ik.name] = ik.values.default 82 83 defaultStr = pickle.dumps(defaults, protocol=2 84 ).encode("zlib").encode("base64") 85 del defaults 86 def _(cls): 87 return defaultStr 88 89 UserUWSJob._default_parameters = classmethod(_) 90 UserUWSJob._default_jobClass = classmethod( 91 lambda _, v=service.getFullId(): v) 92 93 return UserUWSJob 94 95
96 -class UserUWS(uws.UWSWithQueueing):
97 """A UWS for "user jobs", i.e., generic things an a core. 98 99 These dynamically create job classes based on the processing core's 100 parameters. To make this happen, we'll need to override some of the 101 common UWS functions. 102 """ 103 joblistPreamble = ("<?xml-stylesheet href='/static" 104 "/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>") 105 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" 106 "useruws-job-to-html.xsl' type='text/xsl'?>") 107
108 - def __init__(self, service, jobActions):
109 self.runcountGoal = base.getConfig("async", "maxUserUWSRunningDefault") 110 self.service = weakref.proxy(service) 111 uws.UWSWithQueueing.__init__(self, 112 makeUserUWSJobClass(service), jobActions)
113
114 - def getURLForId(self, jobId):
115 return self.service.getURL("uws.xml")+"/"+jobId
116
117 - def _getJob(self, jobId, conn, writable=False):
118 """returns the named job as uws.UWS._getJob. 119 120 However, in a user UWS, there can be jobs from multiple services. 121 It would be nonsense to load another UWS's job's parameters into our 122 job class. To prevent this, we redirect if we find the new job's 123 class isn't ours. On the web interface, that should do the trick. 124 Everywhere else, this may not be entirely clear but still prevent 125 major confusion. 126 127 This is repeating code from uws.UWS._getJob; some refactoring at 128 some point would be nice. 129 """ 130 statementId = 'getById' 131 if writable: 132 statementId = 'getByIdEx' 133 res = self.runCanned(statementId, {"jobId": jobId}, conn) 134 if len(res)!=1: 135 raise uws.JobNotFound(jobId) 136 137 if res[0]["jobClass"]!=self.service.getFullId(): 138 raise svcs.WebRedirect( 139 base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId)) 140 141 return self.jobClass(res[0], self, writable)
142
143 - def getIdsAndPhases(self, *args, **kwargs):
144 # for user UWSes, we only want jobs from our service in the job 145 # list resource. We insert extra conditions to the basic queries. 146 # getById and getAllIds don't change, though, as they're used internally 147 # and could influence, e.g., queueing and such. 148 return uws.UWSWithQueueing.getIdsAndPhases(self, *args, 149 initFragments=["jobClass=%(jobClass)s"], 150 initPars={"jobClass": self.service.getFullId()}, 151 **kwargs)
152 153
154 -def makeUWSForService(service):
155 """returns a UserUWS instance tailored to service. 156 157 All these share a jobs table, but the all have different job 158 classes with the parameters custom-made for the service's core. 159 160 A drawback of this is that each UWS created in this way runs the 161 job table purger again. That shouldn't be a problem per se but 162 may become cumbersome at some point. We can always introduce a 163 class Attribute on UserUWS to keep additional UWSes from starting 164 cron jobs of their own. 165 """ 166 return UserUWS(service, uwsactions.JobActions())
167 168 169 ####################### CLI 170
171 -def parseCommandLine():
172 import argparse 173 parser = argparse.ArgumentParser(description="Run an asynchronous" 174 " generic job (used internally)") 175 parser.add_argument("jobId", type=str, help="UWS id of the job to run") 176 return parser.parse_args()
177 178
179 -def main():
180 args = parseCommandLine() 181 jobId = args.jobId 182 with base.getTableConn() as conn: 183 svcId = list( 184 conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s", 185 {"jobId": jobId}))[0][0] 186 service = base.resolveCrossId(svcId) 187 188 try: 189 job = service.getUWS().getJob(jobId) 190 with job.getWritable() as wjob: 191 wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow()) 192 193 service = base.resolveCrossId(job.jobClass) 194 inputTable = svcs.CoreArgs(service.core.inputTable, 195 job.parameters, job.parameters) 196 inputTable.job = job 197 198 data = service._runWithInputTable( 199 service.core, inputTable, None).original 200 201 # Our cores either return a table, a pair of mime and data, 202 # or None (in which case they added the results themselves) 203 if isinstance(data, tuple): 204 mime, payload = data 205 with job.openResult(mime, "result") as destF: 206 destF.write(payload) 207 208 elif isinstance(data, rsc.Data): 209 destFmt = inputTable.getParam("responseformat" 210 ) or "application/x-votable+xml" 211 with job.openResult(destFmt, "result") as destF: 212 formats.formatData(destFmt, data, destF, False) 213 214 elif data is None: 215 pass 216 217 else: 218 raise NotImplementedError("Cannot handle a service %s result yet."% 219 repr(data)) 220 221 with job.getWritable() as wjob: 222 wjob.change(phase=uws.COMPLETED) 223 224 except SystemExit: 225 pass 226 except uws.JobNotFound: 227 base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId) 228 except Exception as ex: 229 base.ui.notifyError("UWS runner %s major failure"%jobId) 230 # try to push job into the error state -- this may well fail given 231 # that we're quite hosed, but it's worth the try 232 service.getUWS().changeToPhase(jobId, uws.ERROR, ex) 233 raise
234