1 """
2 Execution of UWS (right now, TAP only) requests.
3
4 This mainly intended to be exec'd (through some wrapper) by the queue
5 runner in the main server thread. The jobs executed have to be in
6 the database and have to have a job directory.
7
8 Primarily for testing an alternative interface rabRun exists that takes that
9 takes jobid, and parameters.
10
11 The tap runner takes the job to EXECUTING shortly before sending the
12 query to the DB server. When done, the job's state is one of COMPLETED,
13 ABORTED or ERROR.
14 """
15
16
17
18
19
20
21
22 import datetime
23 import sys
24 import time
25
26 from gavo import base
27 from gavo import formats
28 from gavo import rsc
29 from gavo import rscdesc
30 from gavo import svcs
31 from gavo import utils
32 from gavo import votable
33 from gavo.base import valuemappers
34 from gavo.formats import texttable
35 from gavo.formats import csvtable
36 from gavo.formats import jsontable
37 from gavo.formats import geojson
38 from gavo.formats import votableread
39 from gavo.formats import votablewrite
40 from gavo.protocols import adqlglue
41 from gavo.protocols import simbadinterface
42 from gavo.protocols import tap
43 from gavo.protocols import uws
44
45
46
47 EXIT_PLEASE = False
48
49
50
51
52 _WORKER_PID = None
53
54
63
64
83
84
86 """gets and checks TAP parameters like version, request, and such.
87
88 The function returns a tuple of query and maxrec.
89 """
90 try:
91 if "request" in parameters and parameters["request"]!="doQuery":
92 raise uws.UWSError("This service only supports REQUEST=doQuery", jobId)
93 _assertSupportedLanguage(jobId, parameters["lang"])
94 query = parameters["query"].decode("utf-8")
95 except KeyError as key:
96 raise base.ui.logOldExc(base.ValidationError(
97 "Required parameter %s missing."%key, key))
98
99 try:
100 maxrec = min(base.getConfig("async", "hardMAXREC"),
101 int(parameters["maxrec"]))
102 except ValueError:
103 raise base.ui.logOldError(
104 uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"]))
105 except KeyError:
106 maxrec = base.getConfig("async", "defaultMAXREC")
107 return query, maxrec
108
109
111 """returns an rsc.Data instance containing resultTable and some
112 additional metadata.
113 """
114 resData = rsc.wrapTable(resultTable)
115 resData.addMeta("info", "Query successful",
116 infoName="QUERY_STATUS", infoValue="OK")
117 resData.addMeta("_type", "results")
118 resData.overflowLimit = resultTable.tableDef.overflowLimit
119 return resData
120
121
123
124 if format.startswith("votable"):
125
126
127
128 enc = {
129 "votable": "binary",
130 "votableb2": "binary2",
131 "votabletd": "td",
132 }.get(format, "td")
133
134 oe = votable.OverflowElement(
135 res.getPrimaryTable().tableDef.overflowLimit,
136 votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW"))
137 ctx = votablewrite.VOTableContext(
138 tablecoding=enc,
139 acquireSamples=False,
140 overflowElement=oe)
141 votablewrite.writeAsVOTable(res, outF, ctx)
142 else:
143 formats.formatData(format, res, outF, acquireSamples=False)
144
145
146 -def runTAPQuery(query, timeout, connection, tdsForUploads, maxrec,
147 autoClose=True):
171
172
198
199
201 """stores conn's worker PID in _WORKER_PID.
202 """
203 global _WORKER_PID
204 curs = conn.cursor()
205 curs.execute("SELECT pg_backend_pid()")
206 _WORKER_PID = curs.fetchall()[0][0]
207 curs.close()
208
209
218
219
237
238
240 """executes a TAP job defined by parameters and writes the
241 result to the job's working directory.
242
243 This does not do state management. Use runTAPJob if you need it.
244 """
245 _hangIfMagic(jobId, parameters, timeout)
246
247
248
249 defaultFormat = "votable"
250 if base.getConfig("ivoa", "votDefaultEncoding")=="td":
251 defaultFormat = "votable/td"
252
253 format = normalizeTAPFormat(parameters.get("responseformat", defaultFormat))
254
255 res = _makeDataFor(getQTableFromJob(
256 parameters, jobId, queryProfile, timeout))
257
258 try:
259 job = tap.WORKER_SYSTEM.getJob(jobId)
260 destF = job.openResult(
261 formats.getMIMEFor(
262 format,
263 job.parameters.get("responseformat")),
264 "result")
265 writeResultTo(format, res, destF)
266 destF.close()
267 except Exception:
268
269
270 svcs.mapDBErrors(*sys.exc_info())
271
272
273
274 -def runTAPJob(jobId, queryProfile="untrustedquery"):
275 """executes a TAP job defined by parameters and job id.
276
277 This assumes the job has already been put into executing, and the
278 apprpriate pid has been entered. To indicate that actual processing
279 has started and the job is killable, the start time is recorded, though.
280 """
281 with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
282 job.change(startTime=datetime.datetime.utcnow())
283 timeout = job.executionDuration
284 parameters = job.parameters
285 try:
286 runTAPJobNoState(parameters, jobId, queryProfile, timeout)
287 except Exception as ex:
288 base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex))
289 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
290 else:
291 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)
292
293
295 """executes a TAP job "synchronously".
296
297 When this is done, the job will be in an end state, i.e., ERROR,
298 COMPLETED or perhaps ABORTED.
299
300 You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself.
301
302 Essentially, this puts the job into EXECUTING and declares the
303 pid as -1. The UWS machinery will happily kill a job with a pid
304 when asked to abort such a job, and since sync jobs run with the
305 server's pid, that's really not what we want (conversely: sync
306 jobs can't really be aborted). Anyway: Do *not* put anything
307 getpid returns into a sync job's pid field.
308 """
309 with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
310 job.change(
311 executionDuration=base.getConfig("async", "defaultExecTimeSync"),
312 phase=uws.EXECUTING,
313 pid=-1)
314 runTAPJob(jobId)
315
316
317
318
319
321 """installs a signal handler that pushes our job to aborted on SIGINT.
322 """
323 import signal
324
325 def handler(signo, frame):
326 global EXIT_PLEASE
327 EXIT_PLEASE = True
328
329 signal.signal(signal.SIGINT, handler)
330
331
345
346
355
356
357
359
360
361 import threading
362 t = threading.Thread(target=target)
363 t.setDaemon(True)
364 t.start()
365 try:
366 joinInterruptibly(t, jobId)
367 except (SystemExit, Exception):
368
369 t.join(1)
370 raise
371
372
374 from optparse import OptionParser
375 parser = OptionParser(usage="%prog <jobid>",
376 description="runs the TAP job with <jobid> from the UWS table.")
377 opts, args = parser.parse_args()
378 if len(args)!=1:
379 parser.print_help(file=sys.stderr)
380 sys.exit(1)
381 return opts, args[0]
382
383
385 """causes the execution of the job with jobId sys.argv[0].
386 """
387
388
389
390
391 try:
392 base.DEBUG = False
393 opts, jobId = parseCommandLine()
394 setINTHandler(jobId)
395 try:
396 _runInThread(lambda: runTAPJob(jobId), jobId)
397 base.ui.notifyInfo("taprunner for %s finished"%jobId)
398 except SystemExit:
399 pass
400 except uws.JobNotFound:
401 errmsg = "Giving up non-existing TAP job %s."%jobId
402 sys.stderr.write(errmsg+"\n")
403 base.ui.notifyInfo(errmsg)
404 except Exception as ex:
405 base.ui.notifyError("taprunner %s major failure"%jobId)
406
407
408 tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
409 raise
410 finally:
411 pass
412