1 """
2 A UWS-based interface to datalink.
3
4 TODO: There's quite a bit of parallel between this and useruws. This
5 should probably be reformulated along the lines of useruws.
6 """
7
8
9
10
11
12
13
14 from __future__ import print_function
15
16 import cPickle as pickle
17 import datetime
18
19 from gavo import base
20 from gavo import utils
21 from gavo import rscdesc
22 from gavo.protocols import products
23 from gavo.protocols import uws
24 from gavo.protocols import uwsactions
28 """The transition function for datalink jobs.
29 """
32
33 - def queueJob(self, newState, wjob, ignored):
36
38 return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]
39
42 """A fully qualified id of the DaCHS service to execute the datalink
43 request.
44 """
45
48 """all parameters passed to the datalink job as a request.args dict.
49
50 The serialised representation is the pickled dict. Pickle is ok as
51 the string never leaves our control (the network serialisation is
52 whatever comes in via the POST).
53 """
54 @staticmethod
56 return pickle.loads(pickled)
57
58 @staticmethod
60 return pickle.dumps(args)
61
62
63 -class DLJob(uws.UWSJobWithWD):
64 """a UWS job performing some datalink data preparation.
65
66 In addition to UWS parameters, it has
67
68 * serviceid -- the fully qualified id of the service that will process
69 the request
70 * datalinkargs -- the parameters (in request.args form) of the
71 datalink request.
72 """
73 _jobsTDId = "//datalink#datalinkjobs"
74 _transitions = DLTransitions()
75
76 _parameter_serviceid = ServiceIdParameter
77 _parameter_datalinkargs = ArgsParameter
78
80 """stores datalinkargs from args.
81
82 As there's only one common UWS for all dlasync services, we have
83 to steal the service object from upstack at the moment. Let's see
84 if there's a way around that later.
85 """
86 self.setPar("datalinkargs", args)
87 self.setPar("serviceid", utils.stealVar("service").getFullId())
88
91 """the worker system for datalink jobs.
92 """
93 joblistPreamble = ("<?xml-stylesheet href='/static"
94 "/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>")
95 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
96 "dlasync-job-to-html.xsl' type='text/xsl'?>")
97
98 _baseURLCache = None
99
102
103 @property
106
108 """returns a fully qualified URL for the job with jobId.
109 """
110 return "%s/%s"%(self.baseURL, jobId)
111
112
113
114 DL_WORKER = DLUWS()
115
116
117
118
119
120
121 import warnings
122 from nevow import inevow
123 from nevow import context
124 from nevow import testutil
125 from twisted.internet import defer
126 from twisted.internet import reactor
130 """essentially calls renderHTTP on result and stops the reactor.
131
132 This is a helper for our nevow simulation.
133 """
134 if isinstance(result, basestring):
135 if result:
136 request.write(result)
137 elif hasattr(result, "renderHTTP"):
138 return _doRender(result, ctx)
139 else:
140 warnings.warn("Unsupported async datalink render result: %s"%repr(result))
141 request.d.callback(request.accumulator)
142 reactor.stop()
143 return request.accumulator, request
144
147 """stops the reactor and returns a failure.
148
149 This is a helper for our nevow simulation.
150 """
151 reactor.stop()
152 return failure
153
156 """returns a deferred firing the result of page.renderHTTP(ctx).
157
158 This is a helper for our nevow simulation.
159 """
160 request = inevow.IRequest(ctx)
161 if not hasattr(page, "renderHTTP"):
162 return _requestDone(page, request, ctx)
163
164 d = defer.maybeDeferred(page.renderHTTP,
165 context.PageContext(
166 tag=page, parent=context.RequestContext(tag=request)))
167
168 d.addCallback(_requestDone, request, ctx)
169 d.addErrback(_renderCrashAndBurn, ctx)
170 return d
171
172
173 -class FakeRequest(testutil.AccumulatingFakeRequest):
174 """A nevow Request for local data accumulation.
175
176 We have a version of our own for this since nevow's has a
177 registerProducer that produces an endless loop with push
178 producers (which is what we have).
179 """
180 - def __init__(self, destFile, *args, **kwargs):
181 self.finishDeferred = defer.Deferred()
182 testutil.AccumulatingFakeRequest.__init__(self, *args, **kwargs)
183 self.destFile = destFile
184
186 self.destFile.write(stuff)
187
189 self.producer = producer
190 if not isPush:
191 testutil.AccumulatingFakeRequest.registerProducer(
192 self, producer, isPush)
193
196
198 return self.finishDeferred
199
200
201 -def _getRequestContext(destFile):
202 """returns a very simple nevow context writing to destFile.
203 """
204 req = FakeRequest(destFile)
205 ctx = context.WovenContext()
206 ctx.remember(req)
207 return ctx
208
211 """arranges for the result of rendering the nevow resource page
212 to be written to destFile.
213
214 This uses a very simple simulation of nevow rendering, so a few
215 tricks are possible. Also, it actually runs a reactor to do its magic.
216 """
217 ctx = _getRequestContext(destFile)
218
219 def _(func, ctx):
220 return defer.maybeDeferred(func, ctx
221 ).addCallback(_doRender, ctx)
222
223 reactor.callWhenRunning(_, page.renderHTTP, ctx)
224 reactor.run()
225 return inevow.IRequest(ctx)
226
232 import argparse
233 parser = argparse.ArgumentParser(description="Run an asynchronous datalink"
234 " job (used internally)")
235 parser.add_argument("jobId", type=str, help="UWS id of the job to run")
236 return parser.parse_args()
237
240 args = parseCommandLine()
241 jobId = args.jobId
242 try:
243 job = DL_WORKER.getJob(jobId)
244 with job.getWritable() as wjob:
245 wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
246
247 service = base.resolveCrossId(job.parameters["serviceid"])
248 args = job.parameters["datalinkargs"]
249 data = service.run("dlget", args).original
250
251
252
253
254
255
256 if isinstance(data, tuple):
257 mime, payload = data
258 with job.openResult(mime, "result") as destF:
259 destF.write(payload)
260
261 elif isinstance(data, products.ProductBase):
262
263
264
265 with job.openResult("application/octet-stream", "result") as destF:
266 for chunk in data.iterData():
267 destF.write(chunk)
268
269 elif hasattr(data, "renderHTTP"):
270
271
272 with job.openResult(type, "result") as destF:
273 req = writeResultTo(data, destF)
274 job.fixTypeForResultName("result", req.headers["content-type"])
275
276 else:
277 raise NotImplementedError("Cannot handle a service %s result yet."%
278 repr(data))
279
280 with job.getWritable() as wjob:
281 wjob.change(phase=uws.COMPLETED)
282
283 except SystemExit:
284 pass
285 except uws.JobNotFound:
286 base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId)
287 except Exception as ex:
288 base.ui.notifyError("Datalink runner %s major failure"%jobId)
289
290
291 DL_WORKER.changeToPhase(jobId, uws.ERROR, ex)
292 raise
293
294
295 if __name__=="__main__":
296
297 from nevow import rend
298 import os
299 - class _Foo(rend.Page):
302
304 if self.stuff=="booga":
305 return "abc"
306 else:
307 return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup)
308
310 print("cleaning up")
311 return res
312
313 with open("bla", "w") as f:
314 writeResultTo(_Foo("ork"), f)
315 with open("bla") as f:
316 print(f.read())
317 os.unlink("bla")
318