Source code for gavo.adql.morphhelpers

"""
Helpers for morphing modules
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib

from gavo.adql import nodes


[docs]class State(object): """is a scratchpad for morphers to communicate state among themselves. Append to warnings a necessary. Also, traverse keeps an attribute nodeStack in here letting elements look up its parent chain. """ def __init__(self): self.warnings = [] self.nodeStack = []
[docs] @contextlib.contextmanager def onNodeStack(self, node): self.nodeStack.append(node) try: yield finally: popped = self.nodeStack.pop() assert popped==node, "ADQL morphing node stack corruption"
_BOOLEANIZER_TABLE = { ('=', '0'): "NOT", ('!=', '1'): "NOT", ('=', '1'): "", ('!=', '0'): "",}
[docs]def addNotToBooleanized(expr, operator, operand): """prepends a NOT to expr if operator and operand suggest there should be one for ADQL integerized boolean expressions. The function will return None for unknown combinations of operator and operand, and it will simply hand through Nones for expr, so calling functions can just return addNotToBooleanized(...). """ if expr is None: return expr prefix = _BOOLEANIZER_TABLE.get((operator, operand), None) if prefix is None: # weird non-boolean-looking condition return None elif prefix: return nodes.TransparentNode(children=[prefix, '(', expr, ')']) else: return expr
[docs]def analyzeFuncComp(node, acceptableOperators=["=", "!="]): """returns the (function, other operator) for a comparisonPredicate This is regardless of the order of the comparison. This will return None, None if * node isn't a comparisonPredicate * the operator is not symmetric * none of the operators is a FunctionNode """ if (node.type!="comparisonPredicate" or node.opr not in acceptableOperators): return None, None if isinstance(node.op1, nodes.FunctionNode): return node.op1, node.op2 elif isinstance(node.op2, nodes.FunctionNode): return node.op2, node.op1 else: # no function call, leave things alone return None, None
# Handler functions for booleanizeComparisons _BOOLEANOID_FUNCTIONS = {}
[docs]def registerBooleanizer(funcName, handler): """registers handler as a booleanizer for ADQL functions called funcName. funcName must be all-uppercase for this to work. handler(node, operand, operator) is a function that receives a function node and the operand and operator of the comparison and either returns None to say it can't handle it, or something else; that something else is what the entire comparison node is morphed into. You can call multiple booleanizers for the same function; they will be tried in sequence. Hence, make sure you get your import sequences right if you do this. """ _BOOLEANOID_FUNCTIONS.setdefault(funcName, []).append(handler)
[docs]def booleanizeComparisons(node, state): """turns a comparison expression that's really a boolean expression into a boolean expression. Actual morphers shouldn't use that but rather get their parent from the stack and use its OVERRIDE_RESULT attribute. See the DISTANCE morpher for an example. For several reasons, ufuncs like ivo_hasword can't really do this. Instead, they call registerBooleanizer with the function name and callable that receives the function node, the operator, and the operand. If that function returns non-None, that result is used instead of the current node. """ fCall, opd = analyzeFuncComp(node) if fCall is None: # node is not a comparison with a function; this is probably not # a good sign, as we shouldn't end up here in that case, but # let's hope for the best and fall through to SQL. return node opd = nodes.flatten(opd) for morpher in _BOOLEANOID_FUNCTIONS.get(fCall.funName, []): res = morpher(fCall, node.opr, opd) if res is not None: node = res break return node
[docs]class Morpher(object): """A class managing the process of morphing an ADQL expression. It is constructed with a a dictionary of morphers; the keys are node types, the values morphing functions. Morphing functions have the signature m(node, state) -> node. They should return the node if they do not with to change it. state is a State instance. The main entry point is morph(origTree) -> state, tree. origTree is not modified, the return value can be flattened but can otherwise be severely damaged. For special effects, there's also earlyMorphers. These will be called when traversal reaches the node for the first time. If these return None, traversal continues as usual, if not, their result will be added to the tree and *not* further traversed. TODO: We don't currently have anything requiring earlyMorphers. Do we want to drop the feature? """ def __init__(self, morphers, earlyMorphers={}): self.morphers = morphers self.earlyMorphers = earlyMorphers def _getChangedForSeq(self, value, state): newVal, changed = [], False for child in value: if isinstance(child, nodes.ADQLNode): newVal.append(self._traverse(child, state)) else: newVal.append(child) if newVal[-1]!=child: changed = True if changed: return tuple(newVal) def _getChangedForNode(self, value, state): newVal = self._traverse(value, state) if not newVal is value: return newVal def _getChanges(self, name, value, state): """iterates over key/value pairs changed by morphing value under the key name. """ if isinstance(value, (list, tuple)): meth = self._getChangedForSeq elif isinstance(value, nodes.ADQLNode): meth = self._getChangedForNode else: return newVal = meth(value, state) if newVal is not None: yield name, newVal def _traverse(self, node, state): if node.type in self.earlyMorphers: res = self.earlyMorphers[node.type](node, state) if res is not None: return res with state.onNodeStack(node): changes = [] for name, value in node.iterAttributes(): changes.extend(self._getChanges(name, value, state)) # let handlers down the tree determine the total result (this # is mainly for when comparisons become boolean function calls, # but who knows?) if getattr(node, "OVERRIDE_RESULT", None) is not None: newNode = node.OVERRIDE_RESULT elif changes: newNode = node.change(**dict(changes)) newNode.original = node else: newNode = node curType = getattr(newNode, "type", None) if curType in self.morphers: handlerResult = self.morphers[curType](newNode, state) assert handlerResult is not None,\ "ADQL morph handler for %s returned None"%curType return handlerResult return newNode
[docs] def morph(self, tree): state = State() res = self._traverse(tree, state) return state, res