| Home | Trees | Indices | Help |
|
|---|
|
|
1 """
2 TAP: schema maintenance, job/parameter definition incl. upload and UWS actions.
3 """
4
5 #c Copyright 2008-2019, the GAVO project
6 #c
7 #c This program is free software, covered by the GNU GPL. See the
8 #c COPYING file in the source distribution.
9
10
11 import datetime
12 import os
13
14 from pyparsing import ParseException
15 from twisted.internet import threads
16
17 from gavo import base
18 from gavo import rsc
19 from gavo import svcs
20 from gavo import utils
21 from gavo.protocols import uws
22 from gavo.protocols import uwsactions
23 from gavo.utils import codetricks
24 from gavo.utils import stanxml
25
26
27 RD_ID = "__system__/tap"
28
29 # used in the computation of quote
30 EST_TIME_PER_JOB = datetime.timedelta(minutes=10)
31
32 # A mapping of values of TAP's RESPONSEFORMAT parameter to our
33 # formats.format codes, IANA media types and user-readable labels.
34 # Used below (1st element of value tuple) and for registry purposes.
35 FORMAT_CODES = {
36 base.votableType:
37 ("votable", base.votableType, "VOTable, binary",
38 "ivo://ivoa.net/std/TAPRegExt#output-votable-binary"),
39 "text/xml":
40 ("votable", "text/xml", "VOTable, binary",
41 "ivo://ivoa.net/std/TAPRegExt#output-votable-binary"),
42 "votable":
43 ("votable", base.votableType, "VOTable, binary",
44 "ivo://ivoa.net/std/TAPRegEXT#output-votable-binary"),
45 "application/x-votable+xml;serialization=binary2":
46 ("votableb2", "application/x-votable+xml;serialization=binary2",
47 "VOTable, new binary",
48 "ivo://ivoa.net/std/TAPRegExt#output-votable-binary2"),
49 "votable/b2":
50 ("votableb2", "application/x-votable+xml;serialization=binary2",
51 "VOTable, new binary",
52 "ivo://ivoa.net/std/TAPRegExt#output-votable-binary2"),
53 "vodml":
54 ("vodml", "application/x-votable+xml;version=1.4",
55 "Experimental VOTable 1.4",
56 "http://dc.g-vo.org/output-vodml"),
57 "application/x-votable+xml;serialization=tabledata":
58 ("votabletd", "application/x-votable+xml;serialization=tabledata",
59 "VOTable, tabledata",
60 "ivo://ivoa.net/std/TAPRegEXT#output-votable-td"),
61 "votable/td":
62 ("votabletd", "application/x-votable+xml;serialization=tabledata",
63 "VOTable, tabledata",
64 "ivo://ivoa.net/std/TAPRegEXT#output-votable-td"),
65 "text/csv":
66 ("csv", "text/csv", "CSV without column labels", None),
67 "csv": ("csv_header", "text/csv;header=present",
68 "CSV with column labels", None),
69 "text/csv;header=present":
70 ("csv_header", "text/csv;header=present",
71 "CSV with column labels", None),
72 "text/tab-separated-values":
73 ("tsv", "text/tab-separated-values",
74 "Tab separated values", None),
75 "tsv":
76 ("tsv", "text/tab-separated-values",
77 "Tab separated values", None),
78 "text/plain":
79 ("tsv", "text/plain",
80 "Tab separated values", None),
81 "application/fits":
82 ("fits", "application/fits", "FITS binary table", None),
83 "fits":
84 ("fits", "application/fits", "FITS binary table", None),
85 "text/html":
86 ("html", "text/html", "HTML table", None),
87 "html":
88 ("html", "text/html", "HTML table", None),
89 "json":
90 ("json", "application/json", "JSON", None),
91 "application/json":
92 ("json", "application/json", "JSON", None),
93 "geojson":
94 ("geojson", "application/geo-json", "GeoJSON", None),
95 }
99
100 # this is used below in for registry purposes (values are pairs of
101 # IVOA id and a human-readable label).
102 SUPPORTED_LANGUAGES = {
103 "ADQL": _R(versions={
104 "2.0": "ivo://ivoa.net/std/ADQL#v2.0",
105 "2.1": "ivo://ivoa.net/std/ADQL#v2.1"},
106 description="The Astronomical Data Query Language is the standard"
107 " IVOA dialect of SQL; it contains a very general SELECT statement"
108 " as well as some extensions for spherical geometry and higher"
109 " mathematics.")}
110
111
112 # A list of supported upload methods. This is only used in the registry
113 # interface right now.
114 UPLOAD_METHODS = {
115 "upload-inline": "POST inline upload",
116 "upload-http": "http URL",
117 "upload-https": "https URL",
118 "upload-ftp": "ftp URL",
119 }
127
132 """returns a list of tuples for the supported languages.
133
134 This is tap.SUPPORTED_LANGUAGES in a format suitable for the
135 TAP capabilities element.
136
137 Each tuple returned is made up of
138 (name, description, [(version, ivo-id)...]).
139 """
140 for name, desc in SUPPORTED_LANGUAGES.iteritems():
141 yield (name, desc["description"], desc["versions"].items())
142
145 """yields tuples for the supported output formats.
146
147 This is tap.OUTPUT_FORMATS in a format suitable for the
148 TAP capabilities element.
149
150 Each tuple is made up of (mime, aliases, description, ivoId).
151 """
152 codes, descrs, ivoIds = {}, {}, {}
153 for code, (_, outputMime, descr, ivoId) in FORMAT_CODES.iteritems():
154 codes.setdefault(outputMime, set()).add(code)
155 descrs[outputMime] = descr
156 ivoIds[outputMime] = ivoId
157 for mime in codes:
158 # mime never is an alias of itself
159 codes[mime].discard(mime)
160 yield mime, codes[mime], descrs[mime], ivoIds[mime]
161
166 """publishes info for all ADQL-enabled tables of rd to the TAP_SCHEMA.
167 """
168 # first check if we have any adql tables at all, and don't attempt
169 # anything if we don't (this is cheap optimizing and keeps TAP_SCHEMA
170 # from being created on systems that don't do ADQL.
171 for table in rd.tables:
172 if table.adql:
173 break
174 else:
175 return
176 tapRD = base.caches.getRD(RD_ID)
177 for ddId in ["importTablesFromRD", "importDMsFromRD", "importColumnsFromRD",
178 "importFkeysFromRD", "importGroupsFromRD"]:
179 dd = tapRD.getById(ddId)
180 rsc.makeData(dd, forceSource=rd, parseOptions=rsc.parseValidating,
181 connection=connection, runCommit=False)
182
185 """removes all information originating from rd from TAP_SCHEMA.
186 """
187 rd.setProperty("moribund", "True") # the embedded grammar take this
188 # to mean "kill this"
189 publishToTAP(rd, connection)
190 rd.clearProperty("moribund")
191
194 """returns a list of qualified table names for the TAP-published tables.
195 """
196 with base.getTableConn() as conn:
197 return [r[0]
198 for r in conn.query("select table_name from tap_schema.tables"
199 " order by table_name")]
200
201
202 ########################## Maintaining TAP jobs
203
204
205 -class TAPTransitions(uws.ProcessBasedUWSTransitions):
206 """The transition function for TAP jobs.
207
208 There's a hack here: After each transition, when you've released
209 your lock on the job, call checkProcessQueue (in reality, only
210 PhaseAction does this).
211 """
214
217
219 """puts a job on the queue.
220 """
221 uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
222 wjob.uws.scheduleProcessQueueCheck()
223
225 uws.SimpleUWSTransitions.errorOutJob(self, newPhase, wjob, ignored)
226 wjob.uws.scheduleProcessQueueCheck()
227
229 uws.SimpleUWSTransitions.completeJob(self, newPhase, wjob, ignored)
230 wjob.uws.scheduleProcessQueueCheck()
231
233 try:
234 uws.ProcessBasedUWSTransitions.killJob(self, newPhase, wjob, ignored)
235 finally:
236 wjob.uws.scheduleProcessQueueCheck()
237
238
239 ########################## The TAP UWS job
240
241
242 @utils.memoized
243 -def getUploadGrammar():
244 from pyparsing import (Word, ZeroOrMore, Suppress, StringEnd,
245 alphas, alphanums, CharsNotIn)
246 # Should we allow more tableNames?
247 with utils.pyparsingWhitechars(" \t"):
248 tableName = Word( alphas+"_", alphanums+"_" )
249 # What should we allow/forbid in terms of URIs?
250 uri = CharsNotIn(" ;,")
251 uploadSpec = tableName("name") + "," + uri("uri")
252 uploads = uploadSpec + ZeroOrMore(
253 Suppress(";") + uploadSpec) + StringEnd()
254 uploadSpec.addParseAction(lambda s,p,t: (t["name"], t["uri"]))
255 return uploads
256
259 """iterates over pairs of tableName, uploadSource from a TAP upload string.
260 """
261 try:
262 res = utils.pyparseString(getUploadGrammar(), uploadString).asList()
263 return res
264 except ParseException as ex:
265 raise base.ValidationError(
266 "Syntax error in UPLOAD parameter (near %s)"%(ex.loc), "UPLOAD",
267 hint="Note that we only allow regular SQL identifiers as table names,"
268 " i.e., basically only alphanumerics are allowed.")
269
275
280
283 """A sentinel class representing a file within a job work directory
284 (as resulting from an upload).
285 """
289
291 # stringify to a URL for easy UPLOAD string generation.
292 # This smells of a bad idea. If you change it, change UPLOAD.getParam.
293 return self.getURL()
294
303
306 # the way this is specified, inline uploads are quite tricky.
307 # To obtain the data, we must access the request, which we don't have
308 # here. So, I just grab in from upstack (which of course is bound
309 # to fail if we're not being called from within a proper web request).
310 # It's not pretty, but then this kind of interdependency between
311 # HTTP parameters sucks whatever you do.
312 #
313 # We assume uploads come in the request's special files dictionary.
314 # This is created in taprender.TAPRenderer.gatherUploadFiles.
315
316 _deserialize, _serialize = utils.identity, utils.identity
317
318 @classmethod
320 if not value.strip():
321 return
322 newUploads = []
323 for tableName, upload in parseUploadString(value):
324 if upload.startswith("param:"):
325 newUploads.append(
326 (tableName, cls._saveUpload(job, upload[6:])))
327 else:
328 newUploads.append((tableName, upload))
329 newVal = job.parameters.get(name, [])+newUploads
330 uws.JobParameter.updatePar(name, newVal, job)
331
332 @classmethod
335
336 @classmethod
338 # returns a name hopefully suitable for the file system
339 return rawName.encode("quoted-printable").replace('/', "=2F")
340
341 @classmethod
343 try:
344 uploadData = codetricks.stealVar("request").files[uploadName]
345 except KeyError:
346 raise base.ui.logOldExc(
347 base.ValidationError("No upload '%s' found"%uploadName, "UPLOAD"))
348
349 destFName = cls._cleanName(uploadData.filename)
350 with job.openFile(destFName, "w") as f:
351 f.write(uploadData.file.read())
352 return LocalFile(job.jobId, job.getWD(), destFName)
353
356 _jobsTDId = "//tap#tapjobs"
357 _transitions = TAPTransitions()
358
359 _parameter_maxrec = MaxrecParameter
360 _parameter_lang = LangParameter
361 _parameter_upload = UploadParameter
362 _parameter_request = uws.JobParameter
363 _parameter_query = uws.JobParameter
364 _parameter_responseformat = uws.JobParameter
365
366 @property
368 """returns an estimation of the job completion.
369
370 This currently is very naive: we give each job that's going to run
371 before this one 10 minutes.
372
373 This method needs to be changed when the dequeueing algorithm
374 is changed.
375 """
376 with base.getTableConn() as conn:
377 nBefore = self.uws.runCanned('countQueuedBefore',
378 {'dt': self.destructionTime}, conn)[0]["count"]
379 return datetime.datetime.utcnow()+nBefore*EST_TIME_PER_JOB
380
381
382
383 #################### The TAP worker system
384
385 from gavo.utils.stanxml import Element, registerPrefix, schemaURL
403
404
405 registerPrefix("plan", "http://docs.g-vo.org/std/TAPPlan.xsd",
406 schemaURL("TAPPlan.xsd"))
410 """retrieve a query plan.
411
412 This is actually a TAP action; as we add UWSes, we'll need to think
413 about how we can customize uwsactions my UWS type.
414 """
415 name = "plan"
416
418 if data is None:
419 return
420 elif isinstance(data, tuple):
421 yield Plan.min[str(data[0])]
422 yield Plan.max[str(data[1])]
423 else:
424 yield Plan.value[str(data)]
425
427 def recurse(node):
428 (opName, attrs), children = node[:2], node[2:]
429 res = Plan.operation()[
430 Plan.description[opName],
431 Plan.rows[self._formatRange(attrs.get("rows"))],
432 Plan.cost[self._formatRange(attrs.get("cost"))]]
433 for child in children:
434 res[recurse(child)]
435 return res
436 return Plan.plan[
437 Plan.query[query],
438 recurse(planTree)]
439
441 qTable, plan = qTableAndPlan
442 return stanxml.xmlrender(
443 self._makePlanDoc(plan, qTable.query),
444 "<?xml-stylesheet "
445 "href='/static/xsl/plan-to-html.xsl' type='text/xsl'?>")
446
448 from gavo.protocols import taprunner
449 qTable = taprunner.getQTableFromJob(job.parameters,
450 job.jobId, "untrustedquery", 1)
451 request.setHeader("content-type", "text/xml")
452 return qTable, qTable.getPlan()
453
458
461 """The UWS responsible for processing async TAP requests.
462 """
463 _baseURLCache = None
464
465 joblistPreamble = ("<?xml-stylesheet href='/static"
466 "/xsl/tap-joblist-to-html.xsl' type='text/xsl'?>")
467 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
468 "tap-job-to-html.xsl' type='text/xsl'?>")
469
471 self.runcountGoal = base.getConfig("async", "maxTAPRunning")
472 uws.UWSWithQueueing.__init__(self, TAPJob, uwsactions.JobActions(
473 PlanAction))
474
475 @property
477 if self._baseURLCache is None:
478 self._baseURLCache = base.caches.getRD(
479 RD_ID).getById("run").getURL("tap")
480 return self._baseURLCache
481
486
487 WORKER_SYSTEM = TAPUWS()
493 """A core for the TAP renderer.
494
495 Right now, this is a no-op and not used by the renderer.
496
497 This will change as we move to regularise the TAP system.
498 """
499 name_ = "tapCore"
500
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Thu May 2 07:29:09 2019 | http://epydoc.sourceforge.net |