1 """
2 Support for UWSes defined in user RDs.
3
4 To understand this, start at makeUWSForService.
5 """
6
7
8
9
10
11
12
13 import cPickle as pickle
14 import datetime
15 import weakref
16
17 from gavo import base
18 from gavo import formats
19 from gavo import rsc
20 from gavo import rscdesc
21 from gavo import svcs
22 from gavo.protocols import uws
23 from gavo.protocols import uwsactions
24
25
27 """The transition function for user-defined UWSes.
28 """
31
32 - def queueJob(self, newState, wjob, ignored):
37
39 args = ["gavo", "uwsrun", "--", str(wjob.jobId)]
40 if base.DEBUG:
41 args[1:1] = ["--debug", "--traceback"]
42 return "gavo", args
43
44
46 """returns a uws.JobParameter instance for inputKey.
47 """
48 class SomeParameter(uws.JobParameter):
49 name = inputKey.name
50 _deserialize = inputKey._parse
51 _serialize = inputKey._unparse
52 return SomeParameter
53
54
56 """The base class for the service-specific user UWS jobs.
57
58 (i.e., the things that the UserUWSJobFactory spits out)
59 """
60 _transitions = UserUWSTransitions()
61 _jobsTDId = "//uws#userjobs"
62
63
65 """returns a class object for representing UWS jobs processing requests
66 for service
67 """
68 class UserUWSJob(UserUWSJobBase):
69 pass
70
71 defaults = {}
72 for ik in service.getInputKeysFor("uws.xml"):
73 if ik.type=="file":
74
75 setattr(UserUWSJob, "_parameter_upload", uws.UploadParameter())
76 setattr(UserUWSJob, "_parameter_"+ik.name, uws.FileParameter())
77 continue
78
79 setattr(UserUWSJob, "_parameter_"+ik.name,
80 makeUWSJobParameterFor(ik))
81 defaults[ik.name] = ik.values.default
82
83 defaultStr = pickle.dumps(defaults, protocol=2
84 ).encode("zlib").encode("base64")
85 del defaults
86 def _(cls):
87 return defaultStr
88
89 UserUWSJob._default_parameters = classmethod(_)
90 UserUWSJob._default_jobClass = classmethod(
91 lambda _, v=service.getFullId(): v)
92
93 return UserUWSJob
94
95
97 """A UWS for "user jobs", i.e., generic things an a core.
98
99 These dynamically create job classes based on the processing core's
100 parameters. To make this happen, we'll need to override some of the
101 common UWS functions.
102 """
103 joblistPreamble = ("<?xml-stylesheet href='/static"
104 "/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>")
105 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
106 "useruws-job-to-html.xsl' type='text/xsl'?>")
107
108 - def __init__(self, service, jobActions):
113
116
117 - def _getJob(self, jobId, conn, writable=False):
118 """returns the named job as uws.UWS._getJob.
119
120 However, in a user UWS, there can be jobs from multiple services.
121 It would be nonsense to load another UWS's job's parameters into our
122 job class. To prevent this, we redirect if we find the new job's
123 class isn't ours. On the web interface, that should do the trick.
124 Everywhere else, this may not be entirely clear but still prevent
125 major confusion.
126
127 This is repeating code from uws.UWS._getJob; some refactoring at
128 some point would be nice.
129 """
130 statementId = 'getById'
131 if writable:
132 statementId = 'getByIdEx'
133 res = self.runCanned(statementId, {"jobId": jobId}, conn)
134 if len(res)!=1:
135 raise uws.JobNotFound(jobId)
136
137 if res[0]["jobClass"]!=self.service.getFullId():
138 raise svcs.WebRedirect(
139 base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId))
140
141 return self.jobClass(res[0], self, writable)
142
152
153
155 """returns a UserUWS instance tailored to service.
156
157 All these share a jobs table, but the all have different job
158 classes with the parameters custom-made for the service's core.
159
160 A drawback of this is that each UWS created in this way runs the
161 job table purger again. That shouldn't be a problem per se but
162 may become cumbersome at some point. We can always introduce a
163 class Attribute on UserUWS to keep additional UWSes from starting
164 cron jobs of their own.
165 """
166 return UserUWS(service, uwsactions.JobActions())
167
168
169
170
172 import argparse
173 parser = argparse.ArgumentParser(description="Run an asynchronous"
174 " generic job (used internally)")
175 parser.add_argument("jobId", type=str, help="UWS id of the job to run")
176 return parser.parse_args()
177
178
180 args = parseCommandLine()
181 jobId = args.jobId
182 with base.getTableConn() as conn:
183 svcId = list(
184 conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s",
185 {"jobId": jobId}))[0][0]
186 service = base.resolveCrossId(svcId)
187
188 try:
189 job = service.getUWS().getJob(jobId)
190 with job.getWritable() as wjob:
191 wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
192
193 service = base.resolveCrossId(job.jobClass)
194 inputTable = svcs.CoreArgs(service.core.inputTable,
195 job.parameters, job.parameters)
196 inputTable.job = job
197
198 data = service._runWithInputTable(
199 service.core, inputTable, None).original
200
201
202
203 if isinstance(data, tuple):
204 mime, payload = data
205 with job.openResult(mime, "result") as destF:
206 destF.write(payload)
207
208 elif isinstance(data, rsc.Data):
209 destFmt = inputTable.getParam("responseformat"
210 ) or "application/x-votable+xml"
211 with job.openResult(destFmt, "result") as destF:
212 formats.formatData(destFmt, data, destF, False)
213
214 elif data is None:
215 pass
216
217 else:
218 raise NotImplementedError("Cannot handle a service %s result yet."%
219 repr(data))
220
221 with job.getWritable() as wjob:
222 wjob.change(phase=uws.COMPLETED)
223
224 except SystemExit:
225 pass
226 except uws.JobNotFound:
227 base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId)
228 except Exception as ex:
229 base.ui.notifyError("UWS runner %s major failure"%jobId)
230
231
232 service.getUWS().changeToPhase(jobId, uws.ERROR, ex)
233 raise
234