Package gavo :: Package protocols :: Module taprunner
[frames] | no frames]

Source Code for Module gavo.protocols.taprunner

  1  """ 
  2  Execution of UWS (right now, TAP only) requests. 
  3   
  4  This mainly intended to be exec'd (through some wrapper) by the queue 
  5  runner in the main server thread.  The jobs executed have to be in 
  6  the database and have to have a job directory. 
  7   
  8  Primarily for testing an alternative interface rabRun exists that takes that 
  9  takes jobid, and parameters. 
 10   
 11  The tap runner takes the job to EXECUTING shortly before sending the 
 12  query to the DB server.  When done, the job's state is one of COMPLETED,  
 13  ABORTED or ERROR. 
 14  """ 
 15   
 16  #c Copyright 2008-2019, the GAVO project 
 17  #c 
 18  #c This program is free software, covered by the GNU GPL.  See the 
 19  #c COPYING file in the source distribution. 
 20   
 21   
 22  import datetime 
 23  import sys 
 24  import time 
 25   
 26  from gavo import base 
 27  from gavo import formats 
 28  from gavo import rsc 
 29  from gavo import rscdesc #noflake: cache registration 
 30  from gavo import svcs 
 31  from gavo import utils 
 32  from gavo import votable 
 33  from gavo.base import valuemappers 
 34  from gavo.formats import texttable #noflake: format registration 
 35  from gavo.formats import csvtable #noflake: format registration 
 36  from gavo.formats import jsontable #noflake: format registration 
 37  from gavo.formats import geojson #noflake: format registration 
 38  from gavo.formats import votableread 
 39  from gavo.formats import votablewrite 
 40  from gavo.protocols import adqlglue 
 41  from gavo.protocols import simbadinterface #noflake: UDF registration 
 42  from gavo.protocols import tap 
 43  from gavo.protocols import uws 
 44   
 45   
 46  # set to true by the signal handler 
 47  EXIT_PLEASE = False 
 48   
 49   
 50  # The pid of the worker db backend.  This is used in the signal handler 
 51  # when it tries to kill the running query. 
 52  _WORKER_PID = None 
 53   
 54   
55 -def normalizeTAPFormat(rawFmt):
56 format = rawFmt.lower() 57 try: 58 return tap.FORMAT_CODES[format][0] 59 except KeyError: 60 raise base.ValidationError( 61 "Unsupported format '%s'."%format, colName="RESPONSEFORMAT", 62 hint="Legal format codes include %s"%(", ".join(tap.FORMAT_CODES)))
63 64
65 -def _assertSupportedLanguage(jobId, langSpec):
66 """raises a UWSError if langSpec ("ADQL-3.1") cannot be processed by 67 this service. 68 """ 69 if "-" in langSpec: 70 name, version = langSpec.split("-", 1) 71 else: 72 name, version = langSpec, None 73 74 if name not in tap.SUPPORTED_LANGUAGES: 75 raise uws.UWSError("This service does not support" 76 " the query language %s"%name, jobId) 77 78 if version is not None: 79 if version not in tap.SUPPORTED_LANGUAGES[name]["versions"]: 80 raise uws.UWSError("This service does not support" 81 " version %s of the query language %s (but some other version;" 82 " see capabilities)."%(version, name), jobId)
83 84
85 -def _parseTAPParameters(jobId, parameters):
86 """gets and checks TAP parameters like version, request, and such. 87 88 The function returns a tuple of query and maxrec. 89 """ 90 try: 91 if "request" in parameters and parameters["request"]!="doQuery": 92 raise uws.UWSError("This service only supports REQUEST=doQuery", jobId) 93 _assertSupportedLanguage(jobId, parameters["lang"]) 94 query = parameters["query"].decode("utf-8") 95 except KeyError as key: 96 raise base.ui.logOldExc(base.ValidationError( 97 "Required parameter %s missing."%key, key)) 98 99 try: 100 maxrec = min(base.getConfig("async", "hardMAXREC"), 101 int(parameters["maxrec"])) 102 except ValueError: 103 raise base.ui.logOldError( 104 uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"])) 105 except KeyError: 106 maxrec = base.getConfig("async", "defaultMAXREC") 107 return query, maxrec
108 109
110 -def _makeDataFor(resultTable):
111 """returns an rsc.Data instance containing resultTable and some 112 additional metadata. 113 """ 114 resData = rsc.wrapTable(resultTable) 115 resData.addMeta("info", "Query successful", 116 infoName="QUERY_STATUS", infoValue="OK") 117 resData.addMeta("_type", "results") 118 resData.overflowLimit = resultTable.tableDef.overflowLimit 119 return resData
120 121
122 -def writeResultTo(format, res, outF):
123 # special-case votable formats to handle overflow conditions and such 124 if format.startswith("votable"): 125 # the following duplicates a mapping from votablewrite; that's 126 # bad, and I should figure out a way to teach formats.format 127 # whatever is needed to let it do what we're doing here. Meanwhile: 128 enc = { 129 "votable": "binary", 130 "votableb2": "binary2", 131 "votabletd": "td", 132 }.get(format, "td") 133 134 oe = votable.OverflowElement( 135 res.getPrimaryTable().tableDef.overflowLimit, 136 votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW")) 137 ctx = votablewrite.VOTableContext( 138 tablecoding=enc, 139 acquireSamples=False, 140 overflowElement=oe) 141 votablewrite.writeAsVOTable(res, outF, ctx) 142 else: 143 formats.formatData(format, res, outF, acquireSamples=False)
144 145
146 -def runTAPQuery(query, timeout, connection, tdsForUploads, maxrec, 147 autoClose=True):
148 """executes a TAP query and returns the result in a data instance. 149 """ 150 # Some of this replicates functionality from adqlglue. We should probably 151 # move the implementation there to what's done here. 152 try: 153 pgQuery, tableTrunk = adqlglue.morphADQL(query, 154 tdsForUploads=tdsForUploads, maxrec=maxrec) 155 base.ui.notifyInfo("Sending to postgres: %s"%repr(pgQuery)) 156 157 result = rsc.QueryTable(tableTrunk.tableDef, pgQuery, connection, 158 autoClose=autoClose) 159 result.meta_ = tableTrunk.meta_ 160 # XXX Hack: this is a lousy fix for postgres' seqscan love with 161 # limit and (in particular) q3c. See if we still want this 162 # with newer postgres and q3c. 163 result.configureConnection([ 164 ("enable_seqscan", False), 165 ("cursor_tuple_fraction", 1)]) 166 result.setTimeout(timeout) 167 except: 168 adqlglue.mapADQLErrors(*sys.exc_info()) 169 170 return result
171 172
173 -def _ingestUploads(uploads, connection):
174 tds = [] 175 for destName, src in uploads: 176 if isinstance(src, tap.LocalFile): 177 srcF = open(src.fullPath) 178 else: 179 try: 180 srcF = utils.urlopenRemote(src) 181 except IOError as ex: 182 raise base.ui.logOldExc( 183 base.ValidationError("Upload '%s' cannot be retrieved"%( 184 src), "UPLOAD", hint="The I/O operation failed with the message: "+ 185 str(ex))) 186 if valuemappers.needsQuoting(destName): 187 raise base.ValidationError("'%s' is not a valid table name on" 188 " this site"%destName, "UPLOAD", hint="It either contains" 189 " non-alphanumeric characters or conflicts with an ADQL" 190 " reserved word. Quoted table names are not supported" 191 " at this site.") 192 uploadedTable = votableread.uploadVOTable(destName, srcF, connection, 193 nameMaker=votableread.AutoQuotedNameMaker()) 194 if uploadedTable is not None: 195 tds.append(uploadedTable.tableDef) 196 srcF.close() 197 return tds
198 199
200 -def _noteWorkerPID(conn):
201 """stores conn's worker PID in _WORKER_PID. 202 """ 203 global _WORKER_PID 204 curs = conn.cursor() 205 curs.execute("SELECT pg_backend_pid()") 206 _WORKER_PID = curs.fetchall()[0][0] 207 curs.close()
208 209
210 -def _hangIfMagic(jobId, parameters, timeout):
211 # Test intrumentation. There are more effective ways to DoS me. 212 if parameters.get("query")=="JUST HANG around": 213 time.sleep(timeout) 214 with tap.WORKER_SYSTEM.changeableJob(jobId) as job: 215 job.change(phase=uws.COMPLETED, 216 endTime=datetime.datetime.utcnow()) 217 sys.exit()
218 219
220 -def getQTableFromJob(parameters, jobId, queryProfile, timeout):
221 """returns a QueryTable for a TAP job. 222 """ 223 query, maxrec = _parseTAPParameters(jobId, parameters) 224 connectionForQuery = base.getDBConnection(queryProfile) 225 226 try: 227 _noteWorkerPID(connectionForQuery) 228 except: # Don't fail just because we can't kill workers 229 base.ui.notifyError( 230 "Could not obtain PID for the worker, job %s"%jobId) 231 tdsForUploads = _ingestUploads(parameters.get("upload", ""), 232 connectionForQuery) 233 234 base.ui.notifyInfo("taprunner executing %s"%query) 235 return runTAPQuery(query, timeout, connectionForQuery, 236 tdsForUploads, maxrec)
237 238
239 -def runTAPJobNoState(parameters, jobId, queryProfile, timeout):
240 """executes a TAP job defined by parameters and writes the 241 result to the job's working directory. 242 243 This does not do state management. Use runTAPJob if you need it. 244 """ 245 _hangIfMagic(jobId, parameters, timeout) 246 # The following makes us bail out if a bad format was passed -- no 247 # sense spending the CPU on executing the query then, so we get the 248 # format here. 249 defaultFormat = "votable" 250 if base.getConfig("ivoa", "votDefaultEncoding")=="td": 251 defaultFormat = "votable/td" 252 253 format = normalizeTAPFormat(parameters.get("responseformat", defaultFormat)) 254 255 res = _makeDataFor(getQTableFromJob( 256 parameters, jobId, queryProfile, timeout)) 257 258 try: 259 job = tap.WORKER_SYSTEM.getJob(jobId) 260 destF = job.openResult( 261 formats.getMIMEFor( 262 format, 263 job.parameters.get("responseformat")), 264 "result") 265 writeResultTo(format, res, destF) 266 destF.close() 267 except Exception: 268 # DB errors can occur here since we're streaming directly from 269 # the database. 270 svcs.mapDBErrors(*sys.exc_info())
271 # connectionForQuery closed by QueryTable 272 273
274 -def runTAPJob(jobId, queryProfile="untrustedquery"):
275 """executes a TAP job defined by parameters and job id. 276 277 This assumes the job has already been put into executing, and the 278 apprpriate pid has been entered. To indicate that actual processing 279 has started and the job is killable, the start time is recorded, though. 280 """ 281 with tap.WORKER_SYSTEM.changeableJob(jobId) as job: 282 job.change(startTime=datetime.datetime.utcnow()) 283 timeout = job.executionDuration 284 parameters = job.parameters 285 try: 286 runTAPJobNoState(parameters, jobId, queryProfile, timeout) 287 except Exception as ex: 288 base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex)) 289 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex) 290 else: 291 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)
292 293
294 -def runSyncTAPJob(jobId):
295 """executes a TAP job "synchronously". 296 297 When this is done, the job will be in an end state, i.e., ERROR, 298 COMPLETED or perhaps ABORTED. 299 300 You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself. 301 302 Essentially, this puts the job into EXECUTING and declares the 303 pid as -1. The UWS machinery will happily kill a job with a pid 304 when asked to abort such a job, and since sync jobs run with the 305 server's pid, that's really not what we want (conversely: sync 306 jobs can't really be aborted). Anyway: Do *not* put anything 307 getpid returns into a sync job's pid field. 308 """ 309 with tap.WORKER_SYSTEM.changeableJob(jobId) as job: 310 job.change( 311 executionDuration=base.getConfig("async", "defaultExecTimeSync"), 312 phase=uws.EXECUTING, 313 pid=-1) 314 runTAPJob(jobId)
315 316 317 ############### CLI 318 319
320 -def setINTHandler(jobId):
321 """installs a signal handler that pushes our job to aborted on SIGINT. 322 """ 323 import signal 324 325 def handler(signo, frame): 326 global EXIT_PLEASE 327 EXIT_PLEASE = True
328 329 signal.signal(signal.SIGINT, handler) 330 331
332 -def _killWorker(jobId):
333 """tries to kill the postgres worker for this job. 334 """ 335 with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob: 336 wjob.change(phase=uws.ABORTED) 337 338 if _WORKER_PID: 339 base.ui.notifyInfo("Trying to abort %s, wpid %s"%( 340 jobId, _WORKER_PID)) 341 with base.getUntrustedConn() as conn: 342 curs = conn.cursor() 343 curs.execute("SELECT pg_cancel_backend(%d)"%_WORKER_PID) 344 curs.close()
345 346
347 -def joinInterruptibly(t, jobId):
348 while True: 349 t.join(timeout=0.5) 350 if not t.isAlive(): 351 return 352 if EXIT_PLEASE: 353 _killWorker(jobId) 354 sys.exit(2)
355 356 357
358 -def _runInThread(target, jobId):
359 # The standalone tap runner must run the query in a thread since 360 # it must be able to react to a SIGINT. 361 import threading 362 t = threading.Thread(target=target) 363 t.setDaemon(True) 364 t.start() 365 try: 366 joinInterruptibly(t, jobId) 367 except (SystemExit, Exception): 368 # give us the thread a chance to quit cleanly 369 t.join(1) 370 raise
371 372
373 -def parseCommandLine():
374 from optparse import OptionParser 375 parser = OptionParser(usage="%prog <jobid>", 376 description="runs the TAP job with <jobid> from the UWS table.") 377 opts, args = parser.parse_args() 378 if len(args)!=1: 379 parser.print_help(file=sys.stderr) 380 sys.exit(1) 381 return opts, args[0]
382 383
384 -def main():
385 """causes the execution of the job with jobId sys.argv[0]. 386 """ 387 # there's a problem in CLI behaviour in that if anything goes wrong in 388 # main, a job that may have been created will remain QUEUED forever. 389 # There's little we can do about that, though, since we cannot put 390 # a job into ERROR when we don't know its id or cannot get it from the DB. 391 try: 392 base.DEBUG = False 393 opts, jobId = parseCommandLine() 394 setINTHandler(jobId) 395 try: 396 _runInThread(lambda: runTAPJob(jobId), jobId) 397 base.ui.notifyInfo("taprunner for %s finished"%jobId) 398 except SystemExit: 399 pass 400 except uws.JobNotFound: # someone destroyed the job before I was done 401 errmsg = "Giving up non-existing TAP job %s."%jobId 402 sys.stderr.write(errmsg+"\n") 403 base.ui.notifyInfo(errmsg) 404 except Exception as ex: 405 base.ui.notifyError("taprunner %s major failure"%jobId) 406 # try to push job into the error state -- this may well fail given 407 # that we're quite hosed, but it's worth the try 408 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex) 409 raise 410 finally: 411 pass
412