Source code for gavo.protocols.taprunner

"""
Execution of UWS (right now, TAP only) requests.

This mainly intended to be exec'd (through some wrapper) by the queue
runner in the main server thread.  The jobs executed have to be in
the database and have to have a job directory.

Primarily for testing an alternative interface rabRun exists that takes that
takes jobid, and parameters.

The tap runner takes the job to EXECUTING shortly before sending the
query to the DB server.  When done, the job's state is one of COMPLETED,
ABORTED or ERROR.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import os
import sys
import time

from gavo import base
from gavo import formats
from gavo import rsc
from gavo import svcs
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.formats import texttable #noflake: format registration
from gavo.formats import csvtable #noflake: format registration
from gavo.formats import jsontable #noflake: format registration
from gavo.formats import geojson #noflake: format registration
from gavo.formats import votableread
from gavo.formats import votablewrite
from gavo.protocols import adqlglue
from gavo.protocols import simbadinterface #noflake: UDF registration
from gavo.protocols import tap
from gavo.protocols import uws


# set to true by the signal handler
EXIT_PLEASE = False


# The pid of the worker db backend.  This is used in the signal handler
# when it tries to kill the running query.
_WORKER_PID = None


# test instrumentation
class _MessilyCrash(Exception):
	pass



def _assertSupportedLanguage(jobId, langSpec):
	"""raises a UWSError if langSpec ("ADQL-3.1") cannot be processed by
	this service.
	"""
	if "-" in langSpec:
		name, version = langSpec.split("-", 1)
	else:
		name, version = langSpec, None

	if name not in tap.SUPPORTED_LANGUAGES:
		raise uws.UWSError("This service does not support"
			" the query language %s"%name, jobId)

	if version is not None:
		if version not in tap.SUPPORTED_LANGUAGES[name]["versions"]:
			raise uws.UWSError("This service does not support"
				" version %s of the query language %s (but some other version;"
				" see capabilities)."%(version, name), jobId)


def _parseTAPParameters(jobId, parameters):
	"""gets and checks TAP parameters like version, request, and such.

	The function returns a tuple of query and maxrec.

	Since TAP is now a core, much of this is a no-op, as the parsing
	has been done by the context grammar.  However, since the lang
	rules are a bit nightmarish, I still keep things in this extra
	function.
	"""
	try:
		if parameters["request"]!="doQuery":
			raise uws.UWSError("This service only supports REQUEST=doQuery", jobId)
		_assertSupportedLanguage(jobId, parameters["lang"])
		query = parameters["query"]
	except KeyError as key:
		raise base.ui.logOldExc(base.ValidationError(
			"Required parameter '%s' missing."%key, key))

	try:
		if parameters["maxrec"] is None:
			maxrec = base.getConfig("async", "defaultMAXREC")
		else:
			maxrec = parameters["maxrec"]

		maxrec = min(base.getConfig("async", "hardMAXREC"), maxrec)
	except ValueError:
		raise base.ui.logOldError(
			uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"]))

	return query, maxrec


def _makeDataFor(resultTable):
	"""returns an rsc.Data instance containing resultTable and some
	additional metadata.
	"""
	resData = rsc.wrapTable(resultTable)
	
	resData.contributingMetaCarriers.append(base.resolveCrossId("//tap#run"))
	# secret handshake with adqlglue: Add all tables of which we know that
	# contributed to the query result to the meta sources for Data Origin
	if hasattr(resultTable.tableDef, "contributingTables"):
		resData.contributingMetaCarriers.extend(
			resultTable.tableDef.contributingTables)

	resData.addMeta("info", "Query successful",
		infoName="QUERY_STATUS", infoValue="OK")
	resData.setMeta("_type", "results")
	resData.overflowLimit = resultTable.tableDef.overflowLimit
	return resData


[docs]def writeResultTo(format, res, outF): # special-case votable formats to handle overflow conditions and such if format.startswith("votable"): # the following duplicates a mapping from votablewrite; that's # bad, and I should figure out a way to teach formats.format # whatever is needed to let it do what we're doing here. Meanwhile: enc = { "votable": "binary", "votableb2": "binary2", "votabletd": "td", }.get(format, "td") oe = votable.OverflowElement( res.getPrimaryTable().tableDef.overflowLimit, votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW")) ctx = votablewrite.VOTableContext( tablecoding=enc, acquireSamples=False, overflowElement=oe) votablewrite.writeAsVOTable(res, outF, ctx) else: formats.formatData(format, res, outF, acquireSamples=False)
def _ingestUploads(job, uploads, connection): """ingests uploads (in the format left by procotols.tap) into temporary tables in connection. """ tds = [] for destName, src in uploads or []: if src.startswith("file://"): # here, these are always direct children of the job directory; # as a bit of defense, we only use the last path segment. srcF = job.openFile(src.split("/")[-1]) else: try: srcF = utils.urlopenRemote(src) except IOError as ex: raise base.ui.logOldExc( base.ValidationError("Upload '%s' cannot be retrieved"%( src), "UPLOAD", hint="The I/O operation failed with the message: "+ str(ex))) if valuemappers.needsQuoting(destName): raise base.ValidationError("'%s' is not a valid table name on" " this site"%destName, "UPLOAD", hint="It either contains" " non-alphanumeric characters or conflicts with an ADQL" " reserved word. Quoted table names are not supported" " at this site.") try: uploadedTable = votableread.uploadVOTable(destName, srcF, connection, nameMaker=votableread.AutoQuotedNameMaker()) except Exception as ex: raise base.ui.logOldExc( base.ReportableError(f"While ingesting upload {destName}: {ex}")) if uploadedTable is not None: tds.append(uploadedTable.tableDef) srcF.close() return tds def _noteWorkerPID(conn): """stores conn's worker PID in _WORKER_PID. """ global _WORKER_PID curs = conn.cursor() curs.execute("SELECT pg_backend_pid()") _WORKER_PID = curs.fetchall()[0][0] curs.close() def _hangIfMagic(jobId, parameters, timeout): # Test instrumentation. There are more effective ways to DoS me. if parameters.get("query")=="JUST HANG around": with tap.WORKER_SYSTEM.changeableJob(jobId) as job: job.change(executionDuration=timeout, phase=uws.EXECUTING) time.sleep(timeout) with tap.WORKER_SYSTEM.changeableJob(jobId) as job: job.change(phase=uws.COMPLETED, endTime=datetime.datetime.utcnow()) sys.exit() if parameters.get("query")=="MESSILY CRASH": raise _MessilyCrash()
[docs]def getQTableFromJob(parameters, job, queryProfile, timeout): """returns a QueryTable for a TAP job. """ query, maxrec = _parseTAPParameters(job.jobId, parameters) connectionForQuery = base.getDBConnection(queryProfile) try: _noteWorkerPID(connectionForQuery) except: # Don't fail just because we can't kill workers base.ui.notifyError( f"Could not obtain PID for the worker, job {job.jobId}") tdsForUploads = _ingestUploads(job, parameters["upload"], connectionForQuery) return adqlglue.runTAPQuery(query, timeout, connectionForQuery, tdsForUploads, maxrec)
[docs]def runTAPJobNoState(parameters, jobId, queryProfile, timeout): """executes a TAP job defined by parameters and writes the result to the job's working directory. This does not do state management. Use runTAPJob if you need it. """ _hangIfMagic(jobId, parameters, timeout) # The following makes us bail out if a bad format was passed -- no # sense spending the CPU on executing the query then, so we get the # format here. defaultFormat = "votable" if base.getConfig("ivoa", "votDefaultEncoding")=="td": defaultFormat = "votable/td" rawFormat = (parameters.get("responseformat") or parameters.get("format") or defaultFormat) format = formats.getKeyFor(rawFormat) try: job = tap.WORKER_SYSTEM.getJob(jobId) res = _makeDataFor(getQTableFromJob( parameters, job, queryProfile, timeout)) destF = job.openResult( formats.getMIMEFor( format, rawFormat), "result") writeResultTo(format, res, destF) destF.close() except Exception: # DB errors can occur here since we're streaming directly from # the database. svcs.mapDBErrors(*sys.exc_info())
# connectionForQuery closed by QueryTable
[docs]def runTAPJob(jobId, queryProfile="untrustedquery"): """executes a TAP job defined by parameters and job id. This assumes the job has already been put into executing, and the appropriate pid has been entered. To indicate that actual processing has started and the job is killable, the start time is recorded, though. """ with tap.WORKER_SYSTEM.changeableJob(jobId) as job: job.change(startTime=datetime.datetime.utcnow()) timeout = job.executionDuration parameters = job.parameters try: runTAPJobNoState(parameters, jobId, queryProfile, timeout) except Exception as ex: if not isinstance(ex, base.Error): base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex)) tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex) else: tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)
[docs]def runSyncTAPJob(jobId, queryMeta=None): """executes a TAP job synchronously. When this is done, the job will be in an end state, i.e., ERROR, COMPLETED or perhaps ABORTED. You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself. Essentially, this puts the job into EXECUTING and declares the pid as -1. The UWS machinery will happily kill a job with a pid when asked to abort such a job, and since sync jobs run with the server's pid, that's really not what we want (conversely: sync jobs can't really be aborted). Anyway: Do *not* put anything getpid returns into a sync job's pid field. """ execTime = base.getConfig("async", "defaultExecTimeSync") with tap.WORKER_SYSTEM.changeableJob(jobId) as job: job.change( executionDuration=execTime, phase=uws.EXECUTING, pid=-1) runTAPJob(jobId)
############### CLI
[docs]def setINTHandler(jobId): """installs a signal handler that pushes our job to aborted on SIGINT. """ import signal def handler(signo, frame): global EXIT_PLEASE EXIT_PLEASE = True signal.signal(signal.SIGINT, handler)
def _killWorker(jobId): """tries to kill the postgres worker for this job. """ with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob: wjob.change(phase=uws.ABORTED) if _WORKER_PID: base.ui.notifyInfo("Trying to abort %s, wpid %s"%( jobId, _WORKER_PID)) with base.getUntrustedConn() as conn: curs = conn.cursor() curs.execute("SELECT pg_cancel_backend(%d)"%_WORKER_PID) curs.close()
[docs]def joinInterruptibly(t, jobId): while True: t.join(timeout=0.5) if not t.is_alive(): return if EXIT_PLEASE: _killWorker(jobId) sys.exit(2)
def _runInThread(target, jobId): # The standalone tap runner must run the query in a thread since # it must be able to react to a SIGINT. import threading t = threading.Thread(target=target) t.setDaemon(True) t.start() try: joinInterruptibly(t, jobId) except (SystemExit, Exception): # give the thread a chance to quit cleanly t.join(1) raise
[docs]def parseCommandLine(): from optparse import OptionParser parser = OptionParser(usage="%prog <jobid>", description="runs the TAP job with <jobid> from the UWS table.") opts, args = parser.parse_args() if len(args)!=1: parser.print_help(file=sys.stderr) sys.exit(1) return opts, args[0]
[docs]def main(): """causes the execution of the job with jobId sys.argv[0]. """ from gavo import rscdesc #noflake: cache registration # there's a problem in CLI behaviour in that if anything goes wrong in # main, a job that may have been created will remain QUEUED forever. # There's little we can do about that, though, since we cannot put # a job into ERROR when we don't know its id or cannot get it from the DB. try: base.DEBUG = False opts, jobId = parseCommandLine() setINTHandler(jobId) try: os.setsid() # we don't want to be killed if the server is restarted. # The new server will pick us up. except PermissionError: # we already are the process group leader. pass try: _runInThread(lambda: runTAPJob(jobId), jobId) base.ui.notifyInfo("taprunner for %s finished"%jobId) except SystemExit: pass except uws.JobNotFound: # someone destroyed the job before I was done errmsg = "Giving up non-existing TAP job %s."%jobId base.ui.notifyInfo(errmsg) except _MessilyCrash: # test instrumentation: simulate a severe internal error base.ui.notifyInfo("taprunner was asked to crash messily") sys.exit(1) except Exception as ex: base.ui.notifyError("taprunner %s major failure"%jobId) # try to push job into the error state -- this may well fail given # that we're quite hosed, but it's worth the try tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex) raise finally: pass