Source code for gavo.adql.morphpg

"""
Morphing ADQL into queries that postgres/pgSphere can understand.

For most of ADQL, Postgres supports most of the stuff out of the box, with
perhaps a few syntactic differences.

Spherical geometry is morphed rather extensively, mainly to pgsphere.
There is support for legacy q3c, but since q3c will always be inadequate
for full ADQL geometry support, we'll phase this out (although it's a good
deal faster when it works).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools

from gavo.adql import common
from gavo.adql import morphhelpers
from gavo.adql import nodes
from gavo.adql.nodes import flatten
from gavo.stc import tapstc


[docs]class PostgresMorphError(common.MorphError): pass
[docs]def getOriginal(node): """returns the node.original if it exists, node itself otherwise. This is to undo pgsphere morphing when pgsphereCode has been inserted. """ if getattr(node, "original", None) is None: return node else: return node.original
######## Begin q3c specials # q3c morphing must happen before pgsphere morphs all the mess to # become spoints and stuff (at least the way we built things so far). # Hence, this is written as a fairly freaky early morpher. def _flatAndMorph(node): # This helper flattens a node after applying standard morphs on it. # I need this for the arguments of q3c stuff, since there may # be ADQL specifics in there. if isinstance(node, str): return node else: return nodes.flatten(morphPG(node)[1]) def _q3cIndexPresent(args): """returns True if we think there's a q3c index on anything that's mentioned in args. This is a bit heuristic in that it will go for any user data in any field info in args. This certainly means nothing more than "there's a chance the query planner might use a q3c index here". """ for fi in nodes.iterFieldInfos(args): for ud in fi.userData: if "q3c" in (ud.isIndexed() or []): return True return False def _booleanizeContainsQ3C(node, operator, operand): """turns ADQL CONTAINS calls into q3c expressions if appropriate. This will only work if the arguments have been morphed into pgsphere geometries already. It will leave alone anything it doesn't understand, hopefully for pgsphere to pick it up. """ args = [] for arg in node.args: if hasattr(arg, "original"): # recover pre-pgsphere-morph object args.append(arg.original) else: args.append(arg) if not _q3cIndexPresent(args): return None # leave morphing to someone else if we don't check for point in shape # or if system transformations are required. if len(args)!=2: return None if not hasattr(args[0], "cooSys") or not hasattr(args[1], "cooSys"): # arguments do not look like geometries; leave it to someone else # to blow up return None if tapstc.getPGSphereTrafo(args[0].cooSys, args[1].cooSys) is not None: # we'll need a transform; q3c cannot do this. return None expr = None p, shape = getOriginal(args[0]), getOriginal(args[1]) if p.type!="point": # punt to pgsphere return None if shape.type=="circle": # if we have an spoint-valued center, there's nothing q3c can do if p.x is None: return None # the way PredicateGeometryFunction.optimize sorts the arguments # has constants last, and here we want constants first. expr = ("q3c_join(%s, %s, %s, %s, %s)"%tuple(map(_flatAndMorph, (shape.center.x, shape.center.y, p.x, p.y, shape.radius)))) elif shape.type=="polygon": # if there's spoint columns in the array, there's no coos but # points, and q3c can't do it. if shape.coos is not None: expr = "q3c_poly_query(%s, %s, ARRAY[%s])"%( _flatAndMorph(p.x), _flatAndMorph(p.y), ",".join([ "%s,%s"%(_flatAndMorph(x), _flatAndMorph(y)) for x,y in shape.coos])) return expr def _distanceToQ3C(node, state): # this is called by _distanceToPG below (which seems preferable # to a complex mechanism letting us decouple pgs and q3c morphing) # if it finds a q3c on any of node's arguments (and if there are # split-coordinate arguments). # # Experimentally, comparisons with q3c_dist are much less likely to # be index-optimised than q3c_join, so we take a bit of pains to # generate a q3c_join. This includes rather crazy stack manipulations # to tell the comparison to disappear. parent = state.nodeStack[-1] if (parent.type!="comparisonPredicate" or parent.opr not in ['<', '<=', '>', '>=']): # no supported comparison; pass this on to pgsphere (which probably # isn't much slower, as q3c_dist is bad with indices, too). return None # now figure out if we're comparing within or without op1, opr, otherOp = parent.op1, parent.opr, parent.op2 if getOriginal(op1) is not getOriginal(node): opr = {'<': '>', '<=': '>=', '>': '<', '>=': '<='}[opr] otherOp = op1 negation = "" if opr in ['>', '>=']: negation = "NOT " p1, p2 = node.original.args[1], node.original.args[0] try: fillers = (negation,) + tuple( flatten(a) for a in (p1.x, p1.y, p2.x, p2.y, otherOp)) except AttributeError: # arguments are not simple points -- punt to pgsphere. return None parent.OVERRIDE_RESULT = "%s q3c_join(%s, %s, %s, %s, %s)"%fillers return node ######### End q3c specials ######### Begin morphing to pgSphere
[docs]class PgSphereCode(object): """A node that contains serialized pgsphere expressions plus a coordinate system id for cases in which we must conform. Pass the optional original (the node that generates the stuff) to allow code like the q3c booleanizer above to still work on things if necessary. """ type = "pgsphere literal" def __init__(self, cooSys, content, original=None): self.cooSys, self.content = cooSys, content self.original = original
[docs] def flatten(self): return self.content
[docs] def iterAttributes(self): if False: yield None
[docs] def iterTree(self): return self.original.iterTree()
def _morphCircle(node, state): return PgSphereCode(node.cooSys, "scircle(%s, RADIANS(%s))"%( _flatAndMorph(node.center), _flatAndMorph(node.radius)), original=node) def _morphPoint(node, state): return PgSphereCode(node.cooSys, "spoint(RADIANS(%s), RADIANS(%s))"%tuple( flatten(a) for a in (node.x, node.y)), original=node) def _makePoly(cooSys, points, node): # helper for _morph(Polygon|Box) return PgSphereCode(cooSys, "(SELECT spoly(q.p) FROM (VALUES %s ORDER BY column1) as q(ind,p))"%", ".join( '(%d, %s)'%(i, p) for i, p in enumerate(points)), original=node) def _morphPolygon(node, state): if node.coos is not None: points = ['spoint(RADIANS(%s), RADIANS(%s))'%( _flatAndMorph(a[0]), _flatAndMorph(a[1])) for a in node.coos] elif node.points is not None: points = [_flatAndMorph(p) for p in node.points] else: # pragma: no cover assert False return _makePoly(node.cooSys, points, node) def _morphBox(node, state): args = tuple("RADIANS(%s)"%_flatAndMorph(v) for v in ( node.x, node.width, node.y, node.height)) points = [ "spoint(%s-%s/2, %s-%s/2)"%args, "spoint(%s-%s/2, %s+%s/2)"%args, "spoint(%s+%s/2, %s+%s/2)"%args, "spoint(%s+%s/2, %s-%s/2)"%args] return _makePoly(node.cooSys, points, node) def _getSystem(node): return getattr(getOriginal(node), "cooSys", None) def _transformSystems(pgLiteral, fromSystem, toSystem): # a helper to _booleanizeGeoPredsPGS if fromSystem!=toSystem: trafo = tapstc.getPGSphereTrafo(fromSystem, toSystem) if trafo is not None: pgLiteral = "(%s)%s"%(pgLiteral, trafo) return pgLiteral def _booleanizeGeoPredPGS(node, geoOp): """returns an expression node for pgsphere for an ADQL CONTAINS/INTERSECTS call. """ expr = None sys1, sys2 = _getSystem(node.args[0]), _getSystem(node.args[1]) if isinstance(node.args[0], tapstc.GeomExpr): if isinstance(node.args[1], tapstc.GeomExpr): raise NotImplementedError("Cannot have compound regions in both" " arguments of a geometry predicate") arg2Str = _transformSystems(flatten(node.args[1]), sys1, sys2) expr = node.args[0].asLogic("(%%s %s (%s))"%(geoOp, arg2Str)) elif isinstance(node.args[1], tapstc.GeomExpr): arg1Str = _transformSystems(flatten(node.args[0]), sys2, sys1) expr = node.args[0].asLogic("((%s) %s (%%s))"%(arg1Str, geoOp)) else: # both arguments plain arg1Str = _transformSystems(flatten(node.args[0]), sys1, sys2) arg2Str = flatten(node.args[1]) expr = "((%s) %s (%s))"%(arg1Str, geoOp, arg2Str) return expr def _convertGeoPredicates(node, state): """morphs contains and intersects to pgsphere expressions when they are arguments to a suitable comparison. """ if node.funName=="CONTAINS": geoOp = "<@" elif node.funName=="INTERSECTS": geoOp = "&&" else: return node parent = state.nodeStack[-1] funCall, compareAgainst = morphhelpers.analyzeFuncComp(parent) if funCall is None: return node compareAgainst = flatten(compareAgainst) expr = _booleanizeContainsQ3C(node, parent.opr, compareAgainst) if expr is None: expr = _booleanizeGeoPredPGS(node, geoOp) parent.OVERRIDE_RESULT = morphhelpers.addNotToBooleanized( nodes.flatten(expr), parent.opr, compareAgainst) return funCall def _computePointFunction(node, state): if node.funName=="COORD1": return "DEGREES(long(%s))"%flatten(node.args[0]) elif node.funName=="COORD2": return "DEGREES(lat(%s))"%flatten(node.args[0]) elif node.funName=="COORDSYS": if node.args[0].fieldInfo: cSys = tapstc.getTAPSTC(node.args[0].fieldInfo.stc) else: cSys = getattr(node.args[0], "cooSys", "UNKNOWN") return "'%s'"%cSys else: return node def _distanceToPG(node, state): if _q3cIndexPresent(node.args): res = _distanceToQ3C(node, state) if res is not None: return res args = list(node.args) parent = state.nodeStack[-1] with state.onNodeStack(node): expr = "DEGREES((%s) <-> (%s))"%tuple(flatten(a) for a in args) # When used in comparisons, postgres (v11) doesn't use spoint # indices in dist(a,b)<x expressions. So, I'll have to morph into # contains statements if I am in such a situation. funCall, compareAgainst = morphhelpers.analyzeFuncComp( parent, ['<', '<=', '>', '>=']) if funCall is None: return expr # compareAgainst may or may not be morphed at this point, which # I'd rather avoid, but that's not easy to guarantee. Let's # hope morphing is idempotent and possibly morph again. if isinstance(compareAgainst, nodes.ADQLNode): compareAgainst = _pgMorpher._traverse(compareAgainst, state) # now figure out if we're comparing within or without op1, opr = parent.op1, parent.opr if getOriginal(op1) is not getOriginal(node): opr = {'<': '>', '<=': '>=', '>': '<', '>=': '<='}[opr] negation = "" if opr in ['>', '>=']: negation = "NOT " parent.OVERRIDE_RESULT = "%s (%s) <@ scircle(%s, RADIANS(%s))"%( negation, flatten(args[0]), flatten(args[1]), flatten(compareAgainst)) return expr def _centroidToPG(node, state): # pgsphere right now can only to centroids of points and circles. Try # to come up with a good error message otherwise. def _fail(): raise PostgresMorphError("Can only compute centroids of circles and points" " yet. Complain to make us implement other geometries faster.") arg = node.args[0] if hasattr(arg, "original"): arg = arg.original if arg.type=="polygon" or arg.type=="box": _fail() if getattr(arg, "fieldInfo", None): fi = arg.fieldInfo if fi.type=="spoly" or fi.type=="sbox": _fail() return "@@(%s)"%(flatten(node.args[0])) def _areaToPGSphere(node, state): # pgsphere returns rad**2, adql wants deg**2 return "3282.806350011744*%s"%flatten(node) def _regionToPG(node, state): # pragma: no cover # Too obscure right now. raise NotImplementedError("The REGION string you supplied is not" " supported on this server") def _stcsRegionToPGSphere(node, state): # STCSRegions embed something returned by tapstc's parser. This is # a pgsphere instance if we're lucky (just dump the thing as a string) # or a tapstc.GeomExpr object if we're unlucky -- in that case, we # leave the GeomExpr here and leave it to a contains or intersects # handler to rewrite the entire expression. if isinstance(node.tapstcObj, tapstc.GeomExpr): return node.tapstcObj else: return PgSphereCode(node.cooSys, node.tapstcObj.asPgSphere()) _geometricMorphers = { 'circle': _morphCircle, 'point': _morphPoint, 'box': _morphBox, 'polygon': _morphPolygon, "pointFunction": _computePointFunction, "distanceFunction": _distanceToPG, "centroid": _centroidToPG, "region": _regionToPG, "stcsRegion": _stcsRegionToPGSphere, "area": _areaToPGSphere, "predicateGeometryFunction": _convertGeoPredicates, } ########## End morphing to pgSphere _renamedFunctions = { "LOG": "LN", "LOG10": "LOG", "TRUNCATE": "TRUNC", } _BITWISE_MORPHERS = { 'BITWISE_NOT': lambda x: "~(%s)"%flatten(x), 'BITWISE_AND': lambda a, b: "(%s)&(%s)"%(flatten(a), flatten(b)), 'BITWISE_OR': lambda a, b: "(%s)|(%s)"%(flatten(a), flatten(b)), 'BITWISE_XOR': lambda a, b: "(%s)#(%s)"%(flatten(a), flatten(b)), } def _adqlFunctionToPG(node, state): if node.funName in _renamedFunctions: node.funName = _renamedFunctions[node.funName] # ADQL lets RAND set a seed, fake this in an ugly way if node.funName=='RAND': if len(node.args)==1: # I suppose we should execute a separate query here with # a crafted call to setseed. There's no way to do # that right now, and I'm not forcing it at this point since # the semantics in the ADQL spec are dubious anyway. return "random()" else: return "random()" # ADQL has two-arg TRUNCATE/ROUND -- these become expressions, # so we play it easy and return strings elif node.funName=='TRUNC' or node.funName=='ROUND': if len(node.args)==2: val, prec = flatten(node.args[0]), flatten(node.args[1]) newTerm = nodes.Term(children=[ node.change(args=['(%s)*10^(%s)'%(val, prec)]), "/", "10^(%s)"%prec]) newTerm.addFieldInfo(None) return newTerm # ADQL SQUARE becomes a PG expression. Again, we downgrade to a string. elif node.funName=='SQUARE': return "(%s)^2"%flatten(node.args[0]) elif node.funName in _BITWISE_MORPHERS: return _BITWISE_MORPHERS[node.funName](*node.args) return node def _morphTimestamp(node, state): assert len(node.args)==1 return "(%s)::TIMESTAMP"%flatten(node.args[0]) class _PQAggregatedArray(nodes.TransparentMixin, nodes.FieldInfoedNode): type = "_morphedArrayMap" def _morphArrayMap(node, state): # for now, we blindly serialise the expression (arg 1) into our new tree. return _PQAggregatedArray(children=[ '(SELECT array_agg(', node.args[0], ') from unnest(', node.args[1], ') x)']) _miscMorphers = { "numericValueFunction": _adqlFunctionToPG, "timestampFunction": _morphTimestamp, "arrayMapFunction": _morphArrayMap } class _PGSC(nodes.SelectQuery): """A modifield SelectQuery that fixes the syntactic differences between ADQL and postgres. """ def flatten(self): return nodes.flattenKWs(self, ("SELECT", None), ("", "setQuantifier"), ("", "selectList"), ("", "fromClause"), ("", "whereClause"), ("", "groupby"), ("", "having"), ("", "orderBy"), ("LIMIT", "setLimit")) class _PGQS(nodes.ADQLNode): """A wrapper for a postgres query specification. The only function here is to make sure there's just one LIMIT part at the very end (except, of course, in deeper subqueries). Nuking operand setLimits is already performed by _fixSetLimit below. """ type = "postgres query specification" _a_original = None _a_setLimit = None _a_offset = None def flatten(self): return nodes.flattenKWs(self, ("", "original"), ("LIMIT", "setLimit"), ("OFFSET", "offset")) def _insertPGSC(node, state): """wraps a select clause into something that serializes to postgres. """ return _PGSC.cloneFrom(node) def _expandStars(node, state): """tries to replace all expressions with * in a select list. I'm forcing this because that seems easier than figuring out how to apply the sequencing rules from sql1992, 7.5, to joins with more than two operands. """ # only work if annotation has taken place (else it's probably a test # run anyway) if state.nodeStack[-1].fieldInfos: if node.allFieldsQuery: return nodes.SelectList( selectFields=state.nodeStack[-1].getSelectFields()) else: newCols = [] for col in node.selectFields: if isinstance(col, nodes.QualifiedStar): newCols.extend(state.nodeStack[-1].fromClause.getFieldsForTable( col.sourceTable)) else: newCols.append(col) return node.change(selectFields=tuple(newCols)) return node def _forceAlias(node, state): """forces anonymous expressions to have an alias. We need this as we expand stars here, and with these we need some way to refer to the items. """ if isinstance(node.expr, str): # this can happen if node.expr has been morphed. Though it may be # silly, unconditionally add an alias (unless there already is one) if node.alias is None: node.alias = node.name return node if not isinstance(node.expr, nodes._BaseColumnReference ) and node.alias is None: node.alias = node.name return node def _fixSetLimit(node, state): """postgres only wants a global limit on set expressions. """ for n in node.getSelectClauses(): n.setLimit = None offset = node.offset node.offset = None return _PGQS(original=node, setLimit=node.setLimit and str(node.setLimit), offset=offset) @functools.lru_cache(None) def _needsMaterialization(): """returns True if CTEs need to be explicitly materialised on our current server to make them planner barriers. """ from gavo import base return base.getPgVersion()[0]>11 def _materializeIfNecessary(node, state): """Makes sure CTEs are materialized. starting with version 12, postgres will inline CTEs unless they're explicitly materialised. Since we've materialised them before, for consistency we make sure they are materialised in newer postgres-es as well. We should probably have some way to have users order us to not do it. """ if _needsMaterialization(): for index, token in enumerate(node.children): if token=='AS': break else: raise PostgresMorphError("CTE seems to be missing an AS?") index += 1 node.children = node.children[:index]+( "MATERIALIZED",)+node.children[index:] return node _syntaxMorphers = { "selectQuery": _insertPGSC, 'comparisonPredicate': morphhelpers.booleanizeComparisons, 'selectList': _expandStars, 'derivedColumn': _forceAlias, "selectExpression": _fixSetLimit, "withQuery": _materializeIfNecessary, } # Warning: if ever there are two Morphers for the same type, this will # break, and we'll need to allow lists of Morphers (and need to think # about their sequence...) _allMorphers = _geometricMorphers.copy() _allMorphers.update(_miscMorphers) _allMorphers.update(_syntaxMorphers) _pgMorpher = morphhelpers.Morpher(_allMorphers) morphPG = _pgMorpher.morph