Package gavo :: Package protocols :: Module dlasync
[frames] | no frames]

Source Code for Module gavo.protocols.dlasync

  1  """ 
  2  A UWS-based interface to datalink. 
  3   
  4  TODO: There's quite a bit of parallel between this and useruws.  This 
  5  should probably be reformulated along the lines of useruws. 
  6  """ 
  7   
  8  #c Copyright 2008-2019, the GAVO project 
  9  #c 
 10  #c This program is free software, covered by the GNU GPL.  See the 
 11  #c COPYING file in the source distribution. 
 12   
 13   
 14  from __future__ import print_function 
 15   
 16  import cPickle as pickle 
 17  import datetime 
 18   
 19  from gavo import base 
 20  from gavo import utils 
 21  from gavo import rscdesc #noflake: cache registration 
 22  from gavo.protocols import products 
 23  from gavo.protocols import uws 
 24  from gavo.protocols import uwsactions 
25 26 27 -class DLTransitions(uws.ProcessBasedUWSTransitions):
28 """The transition function for datalink jobs. 29 """
30 - def __init__(self):
32
33 - def queueJob(self, newState, wjob, ignored):
34 uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) 35 return self.startJob(uws.EXECUTING, wjob, ignored)
36
37 - def getCommandLine(self, wjob):
38 return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]
39
40 41 -class ServiceIdParameter(uws.JobParameter):
42 """A fully qualified id of the DaCHS service to execute the datalink 43 request. 44 """
45
46 47 -class ArgsParameter(uws.JobParameter):
48 """all parameters passed to the datalink job as a request.args dict. 49 50 The serialised representation is the pickled dict. Pickle is ok as 51 the string never leaves our control (the network serialisation is 52 whatever comes in via the POST). 53 """ 54 @staticmethod
55 - def _deserialize(pickled):
56 return pickle.loads(pickled)
57 58 @staticmethod
59 - def _serialize(args):
60 return pickle.dumps(args)
61
62 63 -class DLJob(uws.UWSJobWithWD):
64 """a UWS job performing some datalink data preparation. 65 66 In addition to UWS parameters, it has 67 68 * serviceid -- the fully qualified id of the service that will process 69 the request 70 * datalinkargs -- the parameters (in request.args form) of the 71 datalink request. 72 """ 73 _jobsTDId = "//datalink#datalinkjobs" 74 _transitions = DLTransitions() 75 76 _parameter_serviceid = ServiceIdParameter 77 _parameter_datalinkargs = ArgsParameter 78
79 - def _setParamsFromDict(self, args):
80 """stores datalinkargs from args. 81 82 As there's only one common UWS for all dlasync services, we have 83 to steal the service object from upstack at the moment. Let's see 84 if there's a way around that later. 85 """ 86 self.setPar("datalinkargs", args) 87 self.setPar("serviceid", utils.stealVar("service").getFullId())
88
89 90 -class DLUWS(uws.UWS):
91 """the worker system for datalink jobs. 92 """ 93 joblistPreamble = ("<?xml-stylesheet href='/static" 94 "/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>") 95 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" 96 "dlasync-job-to-html.xsl' type='text/xsl'?>") 97 98 _baseURLCache = None 99
100 - def __init__(self):
102 103 @property
104 - def baseURL(self):
105 return base.makeAbsoluteURL("datalinkuws")
106
107 - def getURLForId(self, jobId):
108 """returns a fully qualified URL for the job with jobId. 109 """ 110 return "%s/%s"%(self.baseURL, jobId)
111 112 113 114 DL_WORKER = DLUWS() 115 116 ####################### nevow simulation 117 # This is so we can handle nevow resources coming back from datalink. 118 # Factor this out? This is essentially stolen from trialhelpers, 119 # and we might just put that somewhere where it's useful. 120 121 import warnings 122 from nevow import inevow 123 from nevow import context 124 from nevow import testutil 125 from twisted.internet import defer 126 from twisted.internet import reactor
127 128 129 -def _requestDone(result, request, ctx):
130 """essentially calls renderHTTP on result and stops the reactor. 131 132 This is a helper for our nevow simulation. 133 """ 134 if isinstance(result, basestring): 135 if result: 136 request.write(result) 137 elif hasattr(result, "renderHTTP"): 138 return _doRender(result, ctx) 139 else: 140 warnings.warn("Unsupported async datalink render result: %s"%repr(result)) 141 request.d.callback(request.accumulator) 142 reactor.stop() 143 return request.accumulator, request
144
145 146 -def _renderCrashAndBurn(failure, ctx):
147 """stops the reactor and returns a failure. 148 149 This is a helper for our nevow simulation. 150 """ 151 reactor.stop() 152 return failure
153
154 155 -def _doRender(page, ctx):
156 """returns a deferred firing the result of page.renderHTTP(ctx). 157 158 This is a helper for our nevow simulation. 159 """ 160 request = inevow.IRequest(ctx) 161 if not hasattr(page, "renderHTTP"): 162 return _requestDone(page, request, ctx) 163 164 d = defer.maybeDeferred(page.renderHTTP, 165 context.PageContext( 166 tag=page, parent=context.RequestContext(tag=request))) 167 168 d.addCallback(_requestDone, request, ctx) 169 d.addErrback(_renderCrashAndBurn, ctx) 170 return d
171
172 173 -class FakeRequest(testutil.AccumulatingFakeRequest):
174 """A nevow Request for local data accumulation. 175 176 We have a version of our own for this since nevow's has a 177 registerProducer that produces an endless loop with push 178 producers (which is what we have). 179 """
180 - def __init__(self, destFile, *args, **kwargs):
181 self.finishDeferred = defer.Deferred() 182 testutil.AccumulatingFakeRequest.__init__(self, *args, **kwargs) 183 self.destFile = destFile
184
185 - def write(self, stuff):
186 self.destFile.write(stuff)
187
188 - def registerProducer(self, producer, isPush):
189 self.producer = producer 190 if not isPush: 191 testutil.AccumulatingFakeRequest.registerProducer( 192 self, producer, isPush)
193
194 - def unregisterProducer(self):
195 del self.producer
196
197 - def notifyFinish(self):
198 return self.finishDeferred
199
200 201 -def _getRequestContext(destFile):
202 """returns a very simple nevow context writing to destFile. 203 """ 204 req = FakeRequest(destFile) 205 ctx = context.WovenContext() 206 ctx.remember(req) 207 return ctx
208
209 210 -def writeResultTo(page, destFile):
211 """arranges for the result of rendering the nevow resource page 212 to be written to destFile. 213 214 This uses a very simple simulation of nevow rendering, so a few 215 tricks are possible. Also, it actually runs a reactor to do its magic. 216 """ 217 ctx = _getRequestContext(destFile) 218 219 def _(func, ctx): 220 return defer.maybeDeferred(func, ctx 221 ).addCallback(_doRender, ctx)
222 223 reactor.callWhenRunning(_, page.renderHTTP, ctx) 224 reactor.run() 225 return inevow.IRequest(ctx) 226
227 228 229 ####################### CLI 230 231 -def parseCommandLine():
232 import argparse 233 parser = argparse.ArgumentParser(description="Run an asynchronous datalink" 234 " job (used internally)") 235 parser.add_argument("jobId", type=str, help="UWS id of the job to run") 236 return parser.parse_args()
237
238 239 -def main():
240 args = parseCommandLine() 241 jobId = args.jobId 242 try: 243 job = DL_WORKER.getJob(jobId) 244 with job.getWritable() as wjob: 245 wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow()) 246 247 service = base.resolveCrossId(job.parameters["serviceid"]) 248 args = job.parameters["datalinkargs"] 249 data = service.run("dlget", args).original 250 251 # Unfortunately, datalink cores can in principle return all kinds 252 # of messy things that may not even be representable in plain files 253 # (e.g., nevow resources returning redirects). We hence only 254 # handle (mime, payload) and (certain) Product instances here 255 # and error out otherwise. 256 if isinstance(data, tuple): 257 mime, payload = data 258 with job.openResult(mime, "result") as destF: 259 destF.write(payload) 260 261 elif isinstance(data, products.ProductBase): 262 # We could run renderHTTP and grab the content-type from there 263 # (which probably would be better all around). For now, don't 264 # care: 265 with job.openResult("application/octet-stream", "result") as destF: 266 for chunk in data.iterData(): 267 destF.write(chunk) 268 269 elif hasattr(data, "renderHTTP"): 270 # these are nevow resources. Let's run a reactor so these properly 271 # work. 272 with job.openResult(type, "result") as destF: 273 req = writeResultTo(data, destF) 274 job.fixTypeForResultName("result", req.headers["content-type"]) 275 276 else: 277 raise NotImplementedError("Cannot handle a service %s result yet."% 278 repr(data)) 279 280 with job.getWritable() as wjob: 281 wjob.change(phase=uws.COMPLETED) 282 283 except SystemExit: 284 pass 285 except uws.JobNotFound: 286 base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId) 287 except Exception as ex: 288 base.ui.notifyError("Datalink runner %s major failure"%jobId) 289 # try to push job into the error state -- this may well fail given 290 # that we're quite hosed, but it's worth the try 291 DL_WORKER.changeToPhase(jobId, uws.ERROR, ex) 292 raise
293 294 295 if __name__=="__main__": 296 # silly test code, not normally reached 297 from nevow import rend 298 import os
299 - class _Foo(rend.Page):
300 - def __init__(self, stuff):
301 self.stuff = stuff
302
303 - def renderHTTP(self, ctx):
304 if self.stuff=="booga": 305 return "abc" 306 else: 307 return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup)
308
309 - def cleanup(self, res):
310 print("cleaning up") 311 return res
312 313 with open("bla", "w") as f: 314 writeResultTo(_Foo("ork"), f) 315 with open("bla") as f: 316 print(f.read()) 317 os.unlink("bla") 318