Package gavo :: Package protocols :: Module tap
Source Code for Module gavo.protocols.tap

  1  """ 
  2  TAP: schema maintenance, job/parameter definition incl. upload and UWS actions. 
  3  """ 
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
 11  import datetime 
 12  import os 
 14  from pyparsing import ParseException 
 15  from twisted.internet import threads 
 17  from gavo import base 
 18  from gavo import rsc 
 19  from gavo import svcs 
 20  from gavo import utils 
 21  from gavo.protocols import uws 
 22  from gavo.protocols import uwsactions 
 23  from gavo.utils import codetricks 
 24  from gavo.utils import stanxml 
 27  RD_ID = "__system__/tap" 
 29  # used in the computation of quote 
 30  EST_TIME_PER_JOB = datetime.timedelta(minutes=10) 
 32  # A mapping of values of TAP's RESPONSEFORMAT parameter to our  
 33  # formats.format codes, IANA media types and user-readable labels. 
 34  # Used below (1st element of value tuple) and for registry purposes. 
 35  FORMAT_CODES = { 
 36          base.votableType: 
 37                  ("votable", base.votableType, "VOTable, binary",  
 38                          "ivo://"), 
 39          "text/xml":  
 40                  ("votable", "text/xml", "VOTable, binary", 
 41                          "ivo://"), 
 42          "votable":  
 43                  ("votable", base.votableType, "VOTable, binary", 
 44                          "ivo://"), 
 45          "application/x-votable+xml;serialization=binary2":  
 46                  ("votableb2", "application/x-votable+xml;serialization=binary2",  
 47                          "VOTable, new binary",  
 48                          "ivo://"), 
 49          "votable/b2":  
 50                  ("votableb2", "application/x-votable+xml;serialization=binary2",  
 51                          "VOTable, new binary",  
 52                          "ivo://"), 
 53          "vodml": 
 54                  ("vodml", "application/x-votable+xml;version=1.4", 
 55                          "Experimental VOTable 1.4", 
 56                          ""), 
 57          "application/x-votable+xml;serialization=tabledata": 
 58                  ("votabletd", "application/x-votable+xml;serialization=tabledata",  
 59                          "VOTable, tabledata", 
 60                          "ivo://"), 
 61          "votable/td": 
 62                  ("votabletd", "application/x-votable+xml;serialization=tabledata",  
 63                          "VOTable, tabledata", 
 64                          "ivo://"), 
 65          "text/csv":  
 66                  ("csv", "text/csv", "CSV without column labels", None), 
 67          "csv": ("csv_header", "text/csv;header=present",  
 68                          "CSV with column labels", None), 
 69          "text/csv;header=present":  
 70                  ("csv_header", "text/csv;header=present", 
 71                          "CSV with column labels", None), 
 72          "text/tab-separated-values":  
 73                  ("tsv", "text/tab-separated-values",  
 74                          "Tab separated values", None), 
 75          "tsv":  
 76                  ("tsv", "text/tab-separated-values",  
 77                          "Tab separated values", None), 
 78          "text/plain":  
 79                  ("tsv", "text/plain", 
 80                          "Tab separated values", None), 
 81          "application/fits":  
 82                  ("fits", "application/fits", "FITS binary table", None), 
 83          "fits": 
 84                  ("fits", "application/fits", "FITS binary table", None), 
 85          "text/html":  
 86                  ("html", "text/html", "HTML table", None), 
 87          "html":  
 88                  ("html", "text/html", "HTML table", None), 
 89          "json":  
 90                  ("json", "application/json", "JSON", None), 
 91          "application/json":  
 92                  ("json", "application/json", "JSON", None), 
 93          "geojson": 
 94                  ("geojson", "application/geo-json", "GeoJSON", None), 
 95  } 
96 97 98 -def _R(**kws): return kws
99 100 # this is used below in for registry purposes (values are pairs of 101 # IVOA id and a human-readable label). 102 SUPPORTED_LANGUAGES = { 103 "ADQL": _R(versions={ 104 "2.0": "ivo://", 105 "2.1": "ivo://"}, 106 description="The Astronomical Data Query Language is the standard" 107 " IVOA dialect of SQL; it contains a very general SELECT statement" 108 " as well as some extensions for spherical geometry and higher" 109 " mathematics.")} 110 111 112 # A list of supported upload methods. This is only used in the registry 113 # interface right now. 114 UPLOAD_METHODS = { 115 "upload-inline": "POST inline upload", 116 "upload-http": "http URL", 117 "upload-https": "https URL", 118 "upload-ftp": "ftp URL", 119 }
120 121 122 -class TAPError(uws.UWSError):
123 """here for backward compatibility. 124 125 Deprecated. 126 """
128 129 ######################## registry interface helpers 130 131 -def getSupportedLanguages():
132 """returns a list of tuples for the supported languages. 133 134 This is tap.SUPPORTED_LANGUAGES in a format suitable for the 135 TAP capabilities element. 136 137 Each tuple returned is made up of 138 (name, description, [(version, ivo-id)...]). 139 """ 140 for name, desc in SUPPORTED_LANGUAGES.iteritems(): 141 yield (name, desc["description"], desc["versions"].items())
143 144 -def getSupportedOutputFormats():
145 """yields tuples for the supported output formats. 146 147 This is tap.OUTPUT_FORMATS in a format suitable for the 148 TAP capabilities element. 149 150 Each tuple is made up of (mime, aliases, description, ivoId). 151 """ 152 codes, descrs, ivoIds = {}, {}, {} 153 for code, (_, outputMime, descr, ivoId) in FORMAT_CODES.iteritems(): 154 codes.setdefault(outputMime, set()).add(code) 155 descrs[outputMime] = descr 156 ivoIds[outputMime] = ivoId 157 for mime in codes: 158 # mime never is an alias of itself 159 codes[mime].discard(mime) 160 yield mime, codes[mime], descrs[mime], ivoIds[mime]
162 163 ######################## maintaining TAP schema 164 165 -def publishToTAP(rd, connection):
166 """publishes info for all ADQL-enabled tables of rd to the TAP_SCHEMA. 167 """ 168 # first check if we have any adql tables at all, and don't attempt 169 # anything if we don't (this is cheap optimizing and keeps TAP_SCHEMA 170 # from being created on systems that don't do ADQL. 171 for table in rd.tables: 172 if table.adql: 173 break 174 else: 175 return 176 tapRD = base.caches.getRD(RD_ID) 177 for ddId in ["importTablesFromRD", "importDMsFromRD", "importColumnsFromRD", 178 "importFkeysFromRD", "importGroupsFromRD"]: 179 dd = tapRD.getById(ddId) 180 rsc.makeData(dd, forceSource=rd, parseOptions=rsc.parseValidating, 181 connection=connection, runCommit=False)
183 184 -def unpublishFromTAP(rd, connection):
185 """removes all information originating from rd from TAP_SCHEMA. 186 """ 187 rd.setProperty("moribund", "True") # the embedded grammar take this 188 # to mean "kill this" 189 publishToTAP(rd, connection) 190 rd.clearProperty("moribund")
192 193 -def getAccessibleTables():
194 """returns a list of qualified table names for the TAP-published tables. 195 """ 196 with base.getTableConn() as conn: 197 return [r[0] 198 for r in conn.query("select table_name from tap_schema.tables" 199 " order by table_name")]
201 202 ########################## Maintaining TAP jobs 203 204 205 -class TAPTransitions(uws.ProcessBasedUWSTransitions):
206 """The transition function for TAP jobs. 207 208 There's a hack here: After each transition, when you've released 209 your lock on the job, call checkProcessQueue (in reality, only 210 PhaseAction does this). 211 """
212 - def __init__(self):
215 - def getCommandLine(self, wjob):
216 return "gavo", ["gavo", "--ui", "stingy", "tap", "--", str(wjob.jobId)]
218 - def queueJob(self, newState, wjob, ignored):
219 """puts a job on the queue. 220 """ 221 uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) 222 wjob.uws.scheduleProcessQueueCheck()
224 - def errorOutJob(self, newPhase, wjob, ignored):
225 uws.SimpleUWSTransitions.errorOutJob(self, newPhase, wjob, ignored) 226 wjob.uws.scheduleProcessQueueCheck()
228 - def completeJob(self, newPhase, wjob, ignored):
229 uws.SimpleUWSTransitions.completeJob(self, newPhase, wjob, ignored) 230 wjob.uws.scheduleProcessQueueCheck()
232 - def killJob(self, newPhase, wjob, ignored):
233 try: 234 uws.ProcessBasedUWSTransitions.killJob(self, newPhase, wjob, ignored) 235 finally: 236 wjob.uws.scheduleProcessQueueCheck()
238 239 ########################## The TAP UWS job 240 241 242 @utils.memoized 243 -def getUploadGrammar():
244 from pyparsing import (Word, ZeroOrMore, Suppress, StringEnd, 245 alphas, alphanums, CharsNotIn) 246 # Should we allow more tableNames? 247 with utils.pyparsingWhitechars(" \t"): 248 tableName = Word( alphas+"_", alphanums+"_" ) 249 # What should we allow/forbid in terms of URIs? 250 uri = CharsNotIn(" ;,") 251 uploadSpec = tableName("name") + "," + uri("uri") 252 uploads = uploadSpec + ZeroOrMore( 253 Suppress(";") + uploadSpec) + StringEnd() 254 uploadSpec.addParseAction(lambda s,p,t: (t["name"], t["uri"])) 255 return uploads
257 258 -def parseUploadString(uploadString):
259 """iterates over pairs of tableName, uploadSource from a TAP upload string. 260 """ 261 try: 262 res = utils.pyparseString(getUploadGrammar(), uploadString).asList() 263 return res 264 except ParseException as ex: 265 raise base.ValidationError( 266 "Syntax error in UPLOAD parameter (near %s)"%(ex.loc), "UPLOAD", 267 hint="Note that we only allow regular SQL identifiers as table names," 268 " i.e., basically only alphanumerics are allowed.")
270 271 -class LangParameter(uws.JobParameter):
272 @classmethod
273 - def addPar(cls, name, value, job):
276 277 -class MaxrecParameter(uws.JobParameter):
278 name = "MAXREC" 279 _serialize, _deserialize = staticmethod(uws.strOrEmpty), int
281 282 -class LocalFile(object):
283 """A sentinel class representing a file within a job work directory 284 (as resulting from an upload). 285 """
286 - def __init__(self, jobId, wd, fileName):
287 self.jobId, self.fileName = jobId, fileName 288 self.fullPath = os.path.join(wd, fileName)
290 - def __str__(self):
291 # stringify to a URL for easy UPLOAD string generation. 292 # This smells of a bad idea. If you change it, change UPLOAD.getParam. 293 return self.getURL()
295 - def getURL(self):
296 """returns the URL the file is retrievable under for the life time of 297 the job. 298 """ 299 return base.caches.getRD(RD_ID).getById("run").getURL("tap", 300 absolute=True)+"/async/%s/results/%s"%( 301 self.jobId, 302 self.fileName)
304 305 -class UploadParameter(uws.JobParameter):
306 # the way this is specified, inline uploads are quite tricky. 307 # To obtain the data, we must access the request, which we don't have 308 # here. So, I just grab in from upstack (which of course is bound 309 # to fail if we're not being called from within a proper web request). 310 # It's not pretty, but then this kind of interdependency between 311 # HTTP parameters sucks whatever you do. 312 # 313 # We assume uploads come in the request's special files dictionary. 314 # This is created in taprender.TAPRenderer.gatherUploadFiles. 315 316 _deserialize, _serialize = utils.identity, utils.identity 317 318 @classmethod
319 - def addPar(cls, name, value, job):
320 if not value.strip(): 321 return 322 newUploads = [] 323 for tableName, upload in parseUploadString(value): 324 if upload.startswith("param:"): 325 newUploads.append( 326 (tableName, cls._saveUpload(job, upload[6:]))) 327 else: 328 newUploads.append((tableName, upload)) 329 newVal = job.parameters.get(name, [])+newUploads 330 uws.JobParameter.updatePar(name, newVal, job)
331 332 @classmethod
333 - def getPar(cls, name, job):
334 return ";".join("%s,%s"%p for p in job.parameters.get("upload", ""))
335 336 @classmethod
337 - def _cleanName(cls, rawName):
338 # returns a name hopefully suitable for the file system 339 return rawName.encode("quoted-printable").replace('/', "=2F")
340 341 @classmethod
342 - def _saveUpload(cls, job, uploadName):
343 try: 344 uploadData = codetricks.stealVar("request").files[uploadName] 345 except KeyError: 346 raise base.ui.logOldExc( 347 base.ValidationError("No upload '%s' found"%uploadName, "UPLOAD")) 348 349 destFName = cls._cleanName(uploadData.filename) 350 with job.openFile(destFName, "w") as f: 351 f.write( 352 return LocalFile(job.jobId, job.getWD(), destFName)
354 355 -class TAPJob(uws.UWSJobWithWD):
356 _jobsTDId = "//tap#tapjobs" 357 _transitions = TAPTransitions() 358 359 _parameter_maxrec = MaxrecParameter 360 _parameter_lang = LangParameter 361 _parameter_upload = UploadParameter 362 _parameter_request = uws.JobParameter 363 _parameter_query = uws.JobParameter 364 _parameter_responseformat = uws.JobParameter 365 366 @property
367 - def quote(self):
368 """returns an estimation of the job completion. 369 370 This currently is very naive: we give each job that's going to run 371 before this one 10 minutes. 372 373 This method needs to be changed when the dequeueing algorithm 374 is changed. 375 """ 376 with base.getTableConn() as conn: 377 nBefore = self.uws.runCanned('countQueuedBefore', 378 {'dt': self.destructionTime}, conn)[0]["count"] 379 return datetime.datetime.utcnow()+nBefore*EST_TIME_PER_JOB
380 381 382 383 #################### The TAP worker system 384 385 from gavo.utils.stanxml import Element, registerPrefix, schemaURL
386 387 -class Plan(object):
388 """A container for the XML namespace for query plans. 389 """
390 - class PlanElement(Element):
391 _prefix = "plan" 392 _mayBeEmpty = True
394 - class plan(PlanElement): pass
395 - class operation(PlanElement): pass
396 - class query(PlanElement): pass
397 - class min(PlanElement): pass
398 - class max(PlanElement): pass
399 - class value(PlanElement): pass
400 - class description(PlanElement): pass
401 - class rows(PlanElement): pass
402 - class cost(PlanElement): pass
403 404 405 registerPrefix("plan", "", 406 schemaURL("TAPPlan.xsd"))
407 408 409 -class PlanAction(uwsactions.JobAction):
410 """retrieve a query plan. 411 412 This is actually a TAP action; as we add UWSes, we'll need to think 413 about how we can customize uwsactions my UWS type. 414 """ 415 name = "plan" 416
417 - def _formatRange(self, data):
418 if data is None: 419 return 420 elif isinstance(data, tuple): 421 yield Plan.min[str(data[0])] 422 yield Plan.max[str(data[1])] 423 else: 424 yield Plan.value[str(data)]
426 - def _makePlanDoc(self, planTree, query):
427 def recurse(node): 428 (opName, attrs), children = node[:2], node[2:] 429 res = Plan.operation()[ 430 Plan.description[opName], 431 Plan.rows[self._formatRange(attrs.get("rows"))], 432 Plan.cost[self._formatRange(attrs.get("cost"))]] 433 for child in children: 434 res[recurse(child)] 435 return res
436 return Plan.plan[ 437 Plan.query[query], 438 recurse(planTree)]
440 - def formatPlan(self, qTableAndPlan):
441 qTable, plan = qTableAndPlan 442 return stanxml.xmlrender( 443 self._makePlanDoc(plan, qTable.query), 444 "<?xml-stylesheet " 445 "href='/static/xsl/plan-to-html.xsl' type='text/xsl'?>")
447 - def getPlan(self, job, request):
448 from gavo.protocols import taprunner 449 qTable = taprunner.getQTableFromJob(job.parameters, 450 job.jobId, "untrustedquery", 1) 451 request.setHeader("content-type", "text/xml") 452 return qTable, qTable.getPlan()
454 - def doGET(self, job, request):
455 return threads.deferToThread(self.getPlan, job, request 456 ).addCallback( 457 self.formatPlan)
459 460 -class TAPUWS(uws.UWSWithQueueing):
461 """The UWS responsible for processing async TAP requests. 462 """ 463 _baseURLCache = None 464 465 joblistPreamble = ("<?xml-stylesheet href='/static" 466 "/xsl/tap-joblist-to-html.xsl' type='text/xsl'?>") 467 jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" 468 "tap-job-to-html.xsl' type='text/xsl'?>") 469
470 - def __init__(self):
471 self.runcountGoal = base.getConfig("async", "maxTAPRunning") 472 uws.UWSWithQueueing.__init__(self, TAPJob, uwsactions.JobActions( 473 PlanAction))
474 475 @property
476 - def baseURL(self):
477 if self._baseURLCache is None: 478 self._baseURLCache = base.caches.getRD( 479 RD_ID).getById("run").getURL("tap") 480 return self._baseURLCache
482 - def getURLForId(self, jobId):
483 """returns a fully qualified URL for the job with jobId. 484 """ 485 return "%s/%s/%s"%(self.baseURL, "async", jobId)
488 489 490 ######################### The TAP core 491 492 -class TAPCore(svcs.Core):
493 """A core for the TAP renderer. 494 495 Right now, this is a no-op and not used by the renderer. 496 497 This will change as we move to regularise the TAP system. 498 """ 499 name_ = "tapCore"