Source code for gavo.protocols.adqlglue

"""
Code to bind the adql library to the data center software.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import sys

from gavo import adql
from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils


[docs]def makeFieldInfo(column, sqlName=None): """returns an adql.tree.FieldInfo object from a rscdef.Column. """ return adql.FieldInfo(column.type, column.unit, column.ucd, (column,), stc=column.stc, sqlName=sqlName)
[docs]class TDContext(object): """An object keeping track of the generation of a table definition for ADQL output. """ def __init__(self): self.existingNames = set()
[docs] def getName(self, desiredName): while desiredName in self.existingNames: desiredName = desiredName+"_" self.existingNames.add(desiredName) return desiredName
# For columns of types that have no automatic VOTable null value, # we make up some when we don't have any yet. This is governed by # the following dictionary. # All this is in particular for columns that came into being from # expressions. # # This largely follows what Mark Taylor does in topcat. _artificialNULLs = { "bytea": "255", "smallint": "-32768", "integer": "-2147483648", "bigint": "-9223372036854775808", } def _makeColumnFromFieldInfo(ctx, colName, fi): """constructs a rscdef.Column from a field info pair as left by the ADQL machinery. The strategy: If there's only one userData, we copy the Column contained in there, update the unit and the ucd, plus a warning if the Column has been tainted. If there's more or less than one userData, we create a new Column, use the data provided by fi and make up a description consisting of the source descriptions. Add a taint warning if necessary. Since we cannot assign sensible verbLevels and assume the user wants to see what s/he selected, all fields get verbLevel 1. Types are a serious problem, handled by typesystems. """ if len(fi.userData)==1: res = svcs.OutputField.fromColumn(fi.userData[0]) if fi.type!=fi.userData[0].type: res.xtype = None if hasattr(fi.userData[0], "originalName"): # the following is to undo column renaming for postgres-forbidden # column names colName = fi.userData[0].originalName elif fi.userData[0].name.lower()==colName: # undo case normalisation done by the ADQL machinery # if we're reasonably sure there's no intervening AS colName = fi.userData[0].name else: res = base.makeStruct(svcs.OutputField, name=colName) res.name = ctx.getName(colName) res.ucd = fi.ucd res.unit = fi.unit res.type = fi.type if res.type is None: # that's a literal NULL; it doesn't matter much what we use # here, but the least troublesome NULLs in VOTable are for # floats, so let's use them res.type = 'real' # XXX TODO: do something with stc's "broken" attribute res.stc = fi.stc if len(fi.userData)>1: res.description = ("This field has traces of: %s"%("; ".join([ f.description for f in fi.userData if f.description]))) if fi.tainted: res.description = (res.description+" -- *TAINTED*: the value" " was operated on in a way that unit and ucd may be severely wrong") if (fi.properties.get("src-expression", res.name) !=res.name): res.description += " [ADQL: {}]".format( fi.properties["src-expression"]) # The xtype may be set by the node classes; this is used downstream # to transform to STC-S strings. if "xtype" in fi.properties: res.xtype = fi.properties["xtype"] res.needMunging = True # dates and timestamps should be ISO format for TAP or consistency with it if res.type=="date" or res.type=="timestamp": res.xtype = "timestamp" # integral types must have a null value set since we can't be # sure that a query yields defined results for all of them. # Tough luck if our artificial value is already taken by the table # (remedy: select a suitable null value in the column metadata) if (res.type in _artificialNULLs and ( not (res.values and res.values.nullLiteral) or fi.tainted)): nullLiteral = _artificialNULLs[res.type] if res.values: res.feedObject("values", res.values.change(nullLiteral=nullLiteral)) else: res.feedObject("values", base.makeStruct(rscdef.Values, nullLiteral=nullLiteral)) # unconditionally do away with tableheads inherited from columns # that have annotation indicating they're not plain column references if "src-expression" in fi.properties: res.tablehead = utils.makeEllipsis( fi.properties["src-expression"], 30, "…") res.verbLevel = 1 res.finishElement() return res def _getTableDescForOutput(parsedTree): """returns a sequence of Column instances describing the output of the parsed and annotated ADQL query parsedTree. """ ctx = TDContext() columns = [_makeColumnFromFieldInfo(ctx, *fi) for fi in parsedTree.fieldInfos.seq] # if this is a simple one-table query, take the metadata and params # from that table. fromNames = [t.qName for t in parsedTree.fromClause.getAllTables() if hasattr(t, "qName")] mth = base.caches.getMTH(None) if len(fromNames)==1: try: srcTable = mth.getTableDefForTable(fromNames[0]) # swallow groups for now -- we don't really use them for db tables # but if there are some, they'll be trouble when columns are missing. resTable = srcTable.change(columns=columns, groups=[], primary=()) resTable.copyMetaFrom(srcTable) resTable.id = srcTable.id return resTable except base.NotFoundError: # Single source is not one of our tables, hence no metadata, and # fall through to normal table generation pass # collect the params from all input tables; if there are conflicting # names, we pick one at random (which is about as wrong as anything else; # there's just no way to know what the collision means). params = [] for tableName in fromNames: try: params.extend(p.copy(parent=None) for p in mth.getTableDefForTable(tableName).params) except Exception: # don't fail just because a funny thing is in fromNames pass resTable = base.makeStruct( rscdef.TableDef, columns=columns, params=params, id=parsedTree.suggestAName()) return resTable def _getADQLName(col): """returns the name a column is known as within the ADQL query. This can be different from the actual column name for uploaded tables, where we have to rename columns called oid, tableoid,... On the SQL side, our internal name is being used. """ return getattr(col, "originalName", col.name)
[docs]def adqlTableRefToDaCHS(tableName): """returns a DaCHS-internal table name suitable for dc.tablemeta for an ADQL TableName node. In particular, in DaCHS we don't support catalog, so that errors out immediately. Also, we don't support delimited table identifiers. Anything delimited not consisting exclusively of lower case letters must therefore fail immediately. When they're all lowercase, people engaged in gratuitous quoting. Then, just unquote and move on. """ if isinstance(tableName, str): return tableName surfaceForm = adql.flatten(tableName) if tableName.cat: raise base.NotFoundError(surfaceForm, "table", "published tables", hint="DaCHS services have no tables with catalog parts.") if isinstance(tableName.schema, utils.QuotedName): if not tableName.schema.isRegularLower(): raise base.NotFoundError(surfaceForm, "table", "published tables", hint="You probably should not quote the table schema") schema = tableName.schema.name+"." else: if tableName.schema: schema = tableName.schema+"." else: schema = "" if isinstance(tableName.name, utils.QuotedName): if not tableName.name.isRegularLower(): raise base.NotFoundError(surfaceForm, "table", "published tables", hint="You probably should not quote the table name") else: name = tableName.name.name else: name = tableName.name return schema+name
[docs]class DaCHSFieldInfoGetter(adql.FieldInfoGetter): def __init__(self, accessProfile=None, tdsForUploads=[]): adql.FieldInfoGetter.__init__(self) self.mth = base.caches.getMTH(None) for td in tdsForUploads: self.addExtraFieldInfos( td.id, [(_getADQLName(f), makeFieldInfo(f, sqlName=f.name)) for f in td])
[docs] def getInfosFor(self, tableName): td = self.mth.getTableDefForTable( adqlTableRefToDaCHS(tableName)) return td and [ (_getADQLName(f), makeFieldInfo(f)) for f in td if not f.hidden]
def _addTableMeta(translated, query, tree, table): """adds various info items from query and its parsed tree to a result table. """ # preserve existing INFOs copied from the source table table.makeOriginal("info") table.addMeta("info", "ADQL query translated to local SQL (for debugging)", infoName="sql_query", infoValue=translated) table.addMeta("info", "Original ADQL query", infoName="query", infoValue=query) mth = base.caches.getMTH(None) sourceTables = tree.getContributingNames() # for 1-table queries, we've already copied the entire table metadata. # don't re-copy it. Otherwise tell votablewrite to obtain further # metadata from our contributing tables if len(sourceTables)!=1: metaSources = [] # Secret handshake with taprunner._makeDataFor; see there. table.tableDef.contributingTables = metaSources for tableName in set(tableName for tableName in sourceTables): try: sourceTD = mth.getTableDefForTable(tableName) metaSources.append(sourceTD) for m in sourceTD.iterMeta("_associatedDatalinkService"): idColumn = sourceTD.getColumnByName( m.getMeta("idColumn").getContent()) # ideally, look for column(s) that were built from idColumn # and are untainted (cave: join using) # for now, let's just fake it: try: destCol = table.tableDef.getColumnByName(idColumn.name) table.addMeta("_associatedDatalinkService", None) table.addMeta("_associatedDatalinkService.idColumn", destCol.name) serviceId = m.getMeta("serviceId").getContent() if "#" not in serviceId: serviceId = "%s#%s"%(sourceTD.rd.sourceId, serviceId) table.addMeta("_associatedDatalinkService.serviceId", serviceId) except base.NotFoundError: # User hasn't selected the column with the id. No problem. pass except base.NotFoundError: # that's probably upload tables; we don't parse their # metadata so far anyway. Perhaps we should do so one day. pass except base.Error as msg: # don't fail just because of funny metadata or tables not found base.ui.notifyWarning(f"While adding TAP metadata: {msg}") def _updateMatchLimits(tree, maxrec, hardLimit): """instruments the ADQL tree for the user row set limit maxrec and the system row set limit hard limit. maxrec is a match limit from the protocol level, as opposed to the setLimit from the ADQL TOP clause. The rules of interaction between the two are documented inline below (it's messy). This returns the overflow set limit. If exactly this many rows are returned from he query, and overflow indicator should be set. """ tree.overflowLimit = None # First, fill in system defaults and make sure maxrec doesn't # exceed the caller's or the system's hard limits. if hardLimit is None: hardLimit = base.getConfig("async", "hardMAXREC") if maxrec is None: maxrec = base.getConfig("async", "defaultMAXREC") maxrec = min(maxrec, hardLimit) if tree.setLimit is None: # If no set limit has been passed in, put in maxrec and order # overflow indicators starting there. tree.setLimit = maxrec return maxrec elif maxrec>tree.setLimit: # If the set limit passed in is not larger than maxrec, there's # nothing we need to do, because we can never overflow return maxrec elif maxrec==tree.setLimit: # Special (but probably non-negligible) case: maxrec==set limit # we don't want to trigger an alarm and not touch the set limit either return maxrec+1 else: # We have both maxrec and TOP, and maxrec<=TOP. Set TOP to maxrec+1 # and instruct to report overflows with maxrec+1 rows. Let's hope # no one will mind an extra row here and there. tree.setLimit = maxrec+1 return maxrec+1
[docs]def morphADQL(query, metaProfile=None, tdsForUploads=[], maxrec=None, hardLimit=None): """returns an postgres query and an (empty) result table for the ADQL in query. For an explanation of maxrec and hardLimit, as well as the additional table.tableDef.overflowLimit attribute on the returned table, see _updateMatchLimits above; this will always be an integer. """ base.ui.notifyInfo("Incoming ADQL query: %s"%query) ctx, t = adql.parseAnnotating(query, DaCHSFieldInfoGetter(metaProfile, tdsForUploads)) table = rsc.TableForDef(_getTableDescForOutput(t)) table.tableDef.overflowLimit = _updateMatchLimits(t, maxrec, hardLimit) if hardLimit and int(t.setLimit)>hardLimit: table.addMeta("_warning", "This service has a hard row limit" " of %s. Your row limit was decreased to this value."%hardLimit) t.setLimit = str(hardLimit) morphStatus, morphedTree = adql.morphPG(t) for warning in morphStatus.warnings: table.addMeta("_warning", warning) # escape % to hide them form dbapi replacing translated = adql.flatten(morphedTree).replace("%", "%%") _addTableMeta(translated, query, t, table) return translated, table
[docs]def query(adqlQuery, timeout=15, metaProfile=None, tdsForUploads=[], externalLimit=None, hardLimit=None): """returns a table for query (a string containing ADQL). This is a legacy wrapper for runTAPQuery used by the ADQL web form. Don't use it anywhere else. """ with base.getWritableUntrustedConn() as connection: qtable = runTAPQuery(adqlQuery, timeout, connection, tdsForUploads, externalLimit, False) # instantiate the qtable; we want it all in one go resTable = rsc.InMemoryTable( qtable.tableDef, rows=list(qtable)) qtable.cleanup() if len(resTable)==resTable.tableDef.overflowLimit: resTable.addMeta("_warning", "Query result probably incomplete due" " to the match limit kicking in. Queries not providing a TOP" " clause will be furnished with an automatic TOP %s by the machinery," " so adding a TOP clause with a higher number may help."% base.getConfig("adql", "webDefaultLimit")) return resTable
[docs]def runTAPQuery(query, timeout, connection, tdsForUploads, maxrec, autoClose=True): """executes a TAP query and returns the result in a data instance. """ # ugly convenience hack: because people these days regularly paste # in 0xa0 for blanks and this kind of thing is almost impossible to spot, # I just map them to blanks for now query = query.replace("\xa0", " ") postgresParams = [ # cursor tuple fraction only kicks in if we're actually using # cursors (which for psycopg2 only happens with named cursors). # Either way, we're going to eat all tuples, so tell that to # postgres. ("cursor_tuple_fraction", 1), ("statement_timeout", "%s ms"%int(timeout*1000))] try: pgQuery, tableTrunk = morphADQL(query, tdsForUploads=tdsForUploads, maxrec=maxrec) base.ui.notifyInfo("Sending to postgres: %s"%repr(pgQuery)) # the following ugly hack works around a horrible planner # failure with q3c; we don't want to mess with the planner # unless we think that's necessary. And otherwise wait # to migrate to pgsphere that hopefully doesn't confuse # the planner as much. if "q3c_" in pgQuery: postgresParams.append( ('enable_seqscan', 'no')) resetTo = connection.configure(postgresParams) result = rsc.QueryTable(tableTrunk.tableDef, pgQuery, connection, autoClose=autoClose) result.meta_ = tableTrunk.meta_ result.configureOnClose(resetTo) except: mapADQLErrors(*sys.exc_info()) return result
[docs]def mapADQLErrors(excType, excValue, excTb): if (isinstance(excValue, adql.ParseException) or isinstance(excValue, adql.ParseSyntaxException)): raise base.ui.logOldExc( base.ValidationError("Could not parse your query: %s"% str(excValue), "query")) elif isinstance(excValue, adql.ColumnNotFound): raise base.ui.logOldExc(base.ValidationError("No such field known: %s"% str(excValue), "query")) elif isinstance(excValue, adql.AmbiguousColumn): raise base.ui.logOldExc(base.ValidationError("%s needs to be qualified."% str(excValue), "query")) elif isinstance(excValue, adql.Error): raise base.ui.logOldExc(base.ValidationError(str(excValue), "query")) else: svcs.mapDBErrors(excType, excValue, excTb)
[docs]class ADQLCore(svcs.Core, base.RestrictionMixin): """A core taking an ADQL query from its query argument and returning the result of that query in a standard table. Since the columns returned depend on the query, the outputTable of an ADQL core must not be defined. """ name_ = "adqlCore"
[docs] def wantsTableWidget(self): return True
[docs] def run(self, service, inputTable, queryMeta): inRow = inputTable.getParamDict() queryString = inRow["query"] try: res = query(queryString, timeout=queryMeta["timeout"], hardLimit=100000, externalLimit=queryMeta["dbLimit"]) # XXX Warning: We're returning the db connection to the connection # pool here while we still have a named cursor on it. This is # risky because someone might futz with our connection later. # However, postponing the return of the connection isn't nice # either because then the renderer would have to manage the core's # connections, which is ugly, too. # I'm a bit at a loss for a good solution here. Let's see how # well the "don't care" scheme works out. Maybe we need a "renderer closes # connection" plan for this kind of streaming? res.noPostprocess = True queryMeta["Matched"] = len(res.rows) return res except: mapADQLErrors(*sys.exc_info())
################ region makers # REGION was supposed to be a general extension mechanism but will now # probably wither away. Let's keep things here while it does that. import re def _getRegionId(regionSpec, pat=re.compile("[A-Za-z_]+")): mat = pat.match(regionSpec) if mat: return mat.group() ################### local query interface #########################
[docs]def localquery(): """run the argument as an ADQL query. """ from gavo import rscdesc #noflake: cache registration from gavo import formats q = sys.argv[1] table = query(q, timeout=1000) formats.formatData("votable", table, sys.stdout.buffer)