Source code for gavo.protocols.tap

"""
TAP: schema maintenance, job/parameter definition incl. upload and UWS actions.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import functools
import os
import signal

from twisted.internet import threads

from gavo import base
from gavo import formats
from gavo import rsc
from gavo import svcs
from gavo import utils
from gavo.protocols import taprunner
from gavo.protocols import uws
from gavo.protocols import uwsactions
from gavo.utils.parsetricks import ParseException


TAP_VERSION = "1.1"

RD_ID = "__system__/tap"

# used in the computation of quote
EST_TIME_PER_JOB = datetime.timedelta(minutes=10)


def _R(**kws): return kws

# this is used below in for registry purposes (values are pairs of
# IVOA id and a human-readable label).
SUPPORTED_LANGUAGES = {
	"ADQL": _R(versions={
			"2.0": "ivo://ivoa.net/std/ADQL#v2.0",
			"2.1": "ivo://ivoa.net/std/ADQL#v2.1"},
		description="The Astronomical Data Query Language is the standard"
			" IVOA dialect of SQL; it contains a very general SELECT statement"
			" as well as some extensions for spherical geometry and higher"
			" mathematics.")}


# A list of supported upload methods.  This is only used in the registry
# interface right now.
UPLOAD_METHODS = {
	"upload-inline": "POST inline upload",
	"upload-http": "http URL",
	"upload-https": "https URL",
	"upload-ftp": "ftp URL",
}


[docs]class TAPError(uws.UWSError): """here for backward compatibility. Deprecated. """
######################## registry interface helpers
[docs]def getSupportedLanguages(): """returns a list of tuples for the supported languages. This is SUPPORTED_LANGUAGES in a format suitable for the TAP capabilities element. Each tuple returned is made up of (name, description, [(version, ivo-id)...]). """ for name, desc in SUPPORTED_LANGUAGES.items(): yield (name, desc["description"], list(desc["versions"].items()))
[docs]def getSupportedOutputFormats(): """yields tuples for the supported output formats. This is OUTPUT_FORMATS in a format suitable for the TAP capabilities element. Each tuple is made up of (mime, aliases, description, ivoId). """ for key in formats.iterFormats(): outputMime = formats.getMIMEFor(key) descr = formats.getLabelFor(key) ivoid = formats.getTAPIdFor(key) yield outputMime, formats.getAliasesFor(key), descr, ivoid
######################## maintaining TAP schema def _insertRDIntoTAP_SCHEMA(rd, connection): """helps publishToTAP. Actually, it does all its work, except not rejecting //tap itself. This is an implementation detail of letting //tap#createSchema's postCreation script do its work. """ # first check if we have any adql tables at all, and don't attempt # anything if we don't (this is cheap optimizing and keeps TAP_SCHEMA # from being created on systems that don't do ADQL). for table in rd.tables: # the readProfile condition in the next line is a proxy for adql=hidden. if table.adql or "untrustedquery" in table.readProfiles: break else: return tapRD = base.caches.getRD(RD_ID) for ddId in ["importTablesFromRD", "importDMsFromRD", "importColumnsFromRD", "importFkeysFromRD", "importGroupsFromRD"]: dd = tapRD.getById(ddId) rsc.makeData(dd, forceSource=rd, parseOptions=rsc.parseValidating, connection=connection, runCommit=False) # finally, remove schemas that don't have any tables (which happens # with adql-hidden tables, when moving RDs, etc. Such leftovers # are trouble because they're in TAP_SCHEMA but not in /tables any more. connection.execute(""" DELETE FROM tap_schema.schemas WHERE schema_name IN ( SELECT schema_name FROM tap_schema.schemas LEFT OUTER JOIN tap_schema.tables USING (schema_name) WHERE table_name IS NULL)""")
[docs]def publishToTAP(rd, connection): """publishes info for all ADQL-enabled tables of rd to the TAP_SCHEMA. """ if rd.sourceId=='__system__/tap': # if we're being built ourselves, skip this; tap's tap_schema # maintenance is manual. return _insertRDIntoTAP_SCHEMA(rd, connection)
[docs]def unpublishFromTAP(rd, connection): """removes all information originating from rd from TAP_SCHEMA. """ if rd.sourceId=='__system__/tap': # if we're being built ourselves, skip this; tap's tap_schema # maintenance is manual. return rd.setProperty("moribund", "True") # the embedded grammar take this # to mean "kill this" publishToTAP(rd, connection) rd.clearProperty("moribund")
[docs]def getAccessibleTables(): """returns a list of qualified table names for the TAP-published tables. """ with base.getTableConn() as conn: return [r[0] for r in conn.query("select table_name from tap_schema.tables" " order by table_name")]
########################## Maintaining TAP jobs
[docs]class TAPTransitions(uws.ProcessBasedUWSTransitions): """The transition function for TAP jobs. There's a hack here: After each transition, when you've released your lock on the job, call checkProcessQueue (in reality, only PhaseAction does this). """ def __init__(self): uws.SimpleUWSTransitions.__init__(self, "TAP")
[docs] def getCommandLine(self, wjob): return "gavo", ["gavo", "--ui", "stingy", "taprun", "--", str(wjob.jobId)]
[docs] def queueJob(self, newState, wjob, ignored): """puts a job on the queue. """ uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored) wjob.uws.scheduleProcessQueueCheck()
[docs] def errorOutJob(self, newPhase, wjob, ignored): uws.SimpleUWSTransitions.errorOutJob(self, newPhase, wjob, ignored) wjob.uws.scheduleProcessQueueCheck()
[docs] def completeJob(self, newPhase, wjob, ignored): uws.SimpleUWSTransitions.completeJob(self, newPhase, wjob, ignored) wjob.uws.scheduleProcessQueueCheck()
[docs] def killJob(self, newPhase, wjob, ignored): try: uws.ProcessBasedUWSTransitions.killJob(self, newPhase, wjob, ignored) finally: wjob.uws.scheduleProcessQueueCheck()
########################## The TAP UWS job
[docs]@functools.lru_cache(1) def getUploadGrammar(): from gavo.utils.parsetricks import (Word, ZeroOrMore, Suppress, StringEnd, alphas, alphanums, CharsNotIn, pyparsingWhitechars) # Should we allow more tableNames? with pyparsingWhitechars(" \t"): tableName = Word( alphas+"_", alphanums+"_" ) # What should we allow/forbid in terms of URIs? uri = CharsNotIn(" ;,") uploadSpec = tableName("name") + "," + uri("uri") uploads = uploadSpec + ZeroOrMore( Suppress(";") + uploadSpec) + StringEnd() uploadSpec.addParseAction(lambda s,p,t: (t["name"], t["uri"])) return uploads
[docs]def parseUploadString(uploadString): """iterates over pairs of tableName, uploadSource from a TAP upload string. """ try: res = utils.pyparseString(getUploadGrammar(), uploadString).asList() return res except ParseException as ex: raise base.ValidationError( "Syntax error in UPLOAD parameter (near %s)"%(ex.loc), "UPLOAD", hint="Note that we only allow regular SQL identifiers as table names," " i.e., basically only alphanumerics are allowed.")
[docs]class LocalFile(object): """A sentinel class representing a file within a job work directory (as resulting from an upload). """ # TODO: unify with uws.LocalFile def __init__(self, jobId, wd, fileName): self.jobId, self.fileName = jobId, fileName self.fullPath = os.path.join(wd, fileName) def __str__(self): # This is mainly for serialisation of the upload parameter in the # UWS job. return f"file://{self.fileName}"
[docs] def getURL(self): """returns the URL the file is retrievable under for the life time of the job. """ return base.caches.getRD(RD_ID).getById("run").getURL("async", absolute=True)+"/%s/results/%s"%( self.jobId, self.fileName)
[docs]def mangleUploads(request): """parses TAP-compliant UPLOAD specifications in request and return something that the core can handle. This is being executed from the sync/async renderers and probably can't be used anywhere else. The underlying trouble is that the core cannot see the request any more, and thus could not resolve param: uploads. Since this is different in TAP than in DALI, we have extra code here vs. dali.mangleUploads. """ uploadSpec = request.strargs.pop("upload", []) uploadSpec = ";".join(uploadSpec) if not uploadSpec: return parsed = [] for tableName, upload in parseUploadString(uploadSpec): if upload.startswith("param:"): try: upload = request.files[upload[6:]][0] parsed.append( (tableName, (upload.file_name, upload.file_object))) except KeyError: raise base.ui.logOldExc( base.ValidationError(f"No inline upload '{upload[6:]}' found", "UPLOAD")) except AttributeError: raise base.ui.logOldExc( base.ValidationError( f"Upload parameter references non-file '{upload[6:]}'", "UPLOAD")) else: parsed.append((tableName, upload)) request.strargs["upload"] = parsed
[docs]class TAPJob(uws.UWSJobWithWD): _jobsTDId = "//tap#tapjobs" _transitions = TAPTransitions() @property def quote(self): """returns an estimation of the job completion. This currently is very naive: we give each job that's going to run before this one 10 minutes. This method needs to be changed when the dequeueing algorithm is changed. """ with base.getTableConn() as conn: nBefore = self.uws.runCanned('countQueuedBefore', {'dt': self.destructionTime}, conn)[0]["count"] return datetime.datetime.utcnow()+nBefore*EST_TIME_PER_JOB
[docs] def prepareForDestruction(self): if self.phase==uws.EXECUTING: # since we're about to kill all state anyway, we're just TERM-ing # the child rather than ask it politely. # Let's just hope self.phase isn't lying; if it is, we'll possibly # kill something else... os.kill(self.pid, signal.SIGTERM) super().prepareForDestruction()
#################### The TAP worker system
[docs]class PlanAction(uwsactions.JobAction): """retrieve a query plan. This is actually a TAP action; as we add UWSes, we'll need to think about how we can customize uwsactions my UWS type. """ name = "plan"
[docs] def formatPlan(self, qTableAndPlan): qTable, plan = qTableAndPlan return plan.encode("utf-8")
[docs] def getPlan(self, job, request): from gavo.protocols import taprunner qTable = taprunner.getQTableFromJob(job.parameters, job, "untrustedquery", 1) request.setHeader("content-type", "text/plain;charset=utf-8") return qTable, qTable.getPlan()
[docs] def doGET(self, job, request): return threads.deferToThread(self.getPlan, job, request ).addCallback( self.formatPlan)
[docs]class TAPUWS(uws.UWSWithQueueing): """The UWS responsible for processing async TAP requests. """ _baseURLCache = None joblistPreamble = ("<?xml-stylesheet href='/static" "/xsl/tap-joblist-to-html.xsl' type='text/xsl'?>") jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/" "tap-job-to-html.xsl' type='text/xsl'?>") def __init__(self): self.runcountGoal = base.getConfig("async", "maxTAPRunning") uws.UWSWithQueueing.__init__(self, TAPJob, uwsactions.JobActions( PlanAction)) @functools.cached_property def parameterGrammar(self): """A grammar to be used to process parameters in UWS calls. """ return base.makeStruct( svcs.ContextGrammar, inputTD=base.resolveCrossId("//tap#run").core.inputTable) @property def baseURL(self): if self._baseURLCache is None: self._baseURLCache = base.caches.getRD( RD_ID).getById("run").getURL("sync")[:-5] return self._baseURLCache
[docs] def getURLForId(self, jobId): """returns a fully qualified URL for the job with jobId. """ return "%s/%s/%s"%(self.baseURL, "async", jobId)
WORKER_SYSTEM = TAPUWS() ######################### The TAP core
[docs]class TAPCore(svcs.Core): """A core for the TAP renderer. """ name_ = "tapCore" workerSystem = WORKER_SYSTEM inputTableXML = f""" <inputTable> <inputKey name="request" type="text" required="True" std="True" multiplicity="force-single" description="Type of operation requested; this can be doQuery or getCapabilities. Preferably, don't pass it at all."> <values default="doQuery"/> </inputKey> <inputKey name="lang" type="text" required="True" std="True" multiplicity="force-single" description="A name of a language that QUERY should be parsed as. See capabilities for what you can pass in here."> </inputKey> <inputKey name="query" type="text" required="True" std="True" multiplicity="force-single" description="The query to be executed."> </inputKey> <inputKey name="version" type="text" std="True" description="Don't use this parameter. You'll only break stuff."> <values default="{TAP_VERSION}"/> <preparse> if input!="{TAP_VERSION}": raise ValueError( "Version mismatch; this service only supports" " TAP version {TAP_VERSION}.") </preparse> </inputKey> <inputKey name="format" type="text" description="Deprecated alias of RESPONSEFORMAT"/> <inputKey name="upload" type="raw" description="A TAP-compliant upload; roughly: (tablename,source-uri), where source-uri can also have a param: scheme."> </inputKey> <FEED source="//pql#DALIPars"> <PRUNE name="VERB"/> </FEED> </inputTable>""" # The output table is ignored.
[docs] def run(self, service, inputTable, queryMeta): jobId = WORKER_SYSTEM.getNewIdFromArgs( {}, inputTable.getParamDict()) try: taprunner.runSyncTAPJob(jobId, queryMeta) job = WORKER_SYSTEM.getJob(jobId) if job.phase==uws.COMPLETED: # This is TAP, so there's exactly one result res = job.getResults()[0] name, type = res["resultName"], res["resultType"] # hold on to the result fd so its inode is not lost when we delete # the job. f = open(os.path.join(job.getWD(), name), "rb") return (f, type) elif job.phase==uws.ERROR: exc = job.error raise base.Error(exc["msg"], hint=exc["hint"]) elif job.phase==uws.ABORTED: raise uws.UWSError("Job was manually aborted. For synchronous" " jobs, this probably means the operators killed it.", jobId) else: raise uws.UWSError("Internal error. Invalid UWS phase.", jobId) finally: WORKER_SYSTEM.destroy(jobId)
[docs] def getRelevantTables(self): tables = [] with base.getTableConn() as conn: for tableName, in conn.query("SELECT table_name" " FROM TAP_SCHEMA.tables"): try: tables.append(base.getTableDefForTable(conn, tableName)) except: base.ui.notifyError("Failure trying to retrieve table definition" " for table %s. Please fix the corresponding RD."%tableName) return [t for t in tables if t is not None and t.rd is not None]