Source code for gavo.rscdef.procdef

"""
Basic handling for embedded procedures.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools

from gavo import base
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import rmkfuncs



# Move this one to utils?
[docs]def unionByKey(*sequences): """returns all items in sequences uniqued by the items' key attributes. The order of the sequence items is not maintained, but items in later sequences override those in earlier ones. """ allItems = {} for seq in sequences: for item in seq: allItems[item.key] = item return list(allItems.values())
[docs]class RDParameter(base.Structure): """A base class for parameters. """ _name = base.UnicodeAttribute("key", default=base.Undefined, description="The name of the parameter", copyable=True, strip=True, aliases=["name"]) _descr = base.NWUnicodeAttribute("description", default=None, description="Some human-readable description of what the" " parameter is about", copyable=True, strip=True) _expr = base.DataContent(description="The default for the parameter." " The special value __NULL__ indicates a NULL (python None) as usual." " An empty content means a non-preset parameter, which must be filled" " in applications. The magic value __EMPTY__ allows presetting an" " empty string.", copyable=True, strip=True, default=base.NotGiven) _late = base.BooleanAttribute("late", default=False, description="Bind the name not at setup time but at applying" " time. In rowmaker procedures, for example, this allows you to" " refer to variables like vars or rowIter in the bindings.")
[docs] def isDefaulted(self): return self.content_ is not base.NotGiven
[docs] def validate(self): super().validate() if not utils.identifierPattern.match(self.key): raise base.LiteralParseError("name", self.key, hint= "The name you supplied was not defined by any procedure definition.")
[docs] def completeElement(self, ctx): if self.content_=="__EMPTY__": self.content_ = "" super().completeElement(ctx)
[docs]class AliasableRDParameter(RDParameter): """A base class for RD parameter definitions that support aliases. """ _alias = base.UnicodeAttribute("alias", default=None, description="A deprecated name for the parameter", copyable=True, strip=True)
[docs]class ProcPar(AliasableRDParameter): """A parameter of a procedure definition. Bodies of ProcPars are interpreted as python expressions, in which macros are expanded in the context of the procedure application's parent. If a body is empty, the parameter has no default and has to be filled by the procedure application. """ name_ = "par"
[docs] def validate(self): super().validate() # Allow non-python syntax when things look like macro calls. if (self.content_ and not "\\" in self.content_): utils.ensureExpression( common.replaceProcDefAt(self.content_), self.key)
[docs]class Binding(ProcPar): """A binding of a procedure definition parameter to a concrete value. The value to set is contained in the binding body in the form of a python expression. The body must not be empty. """ name_ = "bind"
[docs] def validate(self): super().validate() if not self.content_ or not self.content_.strip(): raise base.StructureError("Binding bodies must not be empty.")
[docs]class ProcSetup(base.Structure): """Prescriptions for setting up a namespace for a procedure application. You can add names to this namespace you using par(ameter)s. If a parameter has no default and an procedure application does not provide them, an error is raised. You can also add names by providing a code attribute containing a python function body in code. Within, the parameters are available. The procedure application's parent can be accessed as parent. All names you define in the code are available as globals to the procedure body. Caution: Macros are expanded within the code; this means you need double backslashes if you want a single backslash in python code. """ name_ = "setup" _code = base.ListOfAtomsAttribute("codeFrags", description="Python function bodies setting globals for the function" " application. Macros are expanded in the context" " of the procedure's parent.", itemAttD=base.UnicodeAttribute("code", description="Python function" " bodies setting globals for the function application. Macros" " are expanded in the context of the procedure's parent.", copyable=True), copyable=True) _pars = base.StructListAttribute("pars", ProcPar, description="Names to add to the procedure's global namespace.", copyable=True) _imports = base.UnicodeAttribute("imports", description="A list of comma-separated imports to put into the" " code's namespace. Dottesd specs like a.b.c are converted to" " ``from a.b import c``", default=base.NotGiven, copyable=True) _original = base.OriginalAttribute() def _getParSettingCode(self, useLate, indent, bindings): """returns code that sets our parameters. If useLate is true, generate for late bindings. Indent the code by indent. Bindings is is a dictionary overriding the defaults or setting parameter values. """ parCode = [] for p in self.pars: if p.late==useLate: val = bindings.get(p.key, base.NotGiven) if val is base.NotGiven: val = p.content_ if val is base.NotGiven: # val must be bound somewhere else (since _ensureParsBound # succeeded); don't emit any code for this par in this setup # and hope for the best. continue parCode.append("%s%s = %s"%(indent, p.key, val)) return "\n".join(parCode)
[docs] def getParCode(self, bindings): """returns code doing setup bindings un-indented. """ return self._getParSettingCode(False, "", bindings)
[docs] def getLateCode(self, bindings): """returns code doing late (in-function) bindings indented with two spaces. """ return self._getParSettingCode(True, " ", bindings)
[docs] def getImportsCode(self): if not self.imports: return for spec in self.imports.strip().split(","): parts = spec.strip().split(".") if len(parts)>1: yield "from %s import %s"%(".".join(parts[:-1]), parts[-1]) else: yield "import "+parts[0]
[docs] def getBodyCode(self): """returns the body code un-indented. """ collectedCode = list(self.getImportsCode()) for frag in self.codeFrags: try: collectedCode.append( utils.fixIndentation(frag, "", governingLine=1)) except base.LiteralParseError as ex: ex.pos = self.getSourcePosition() raise return "\n".join(collectedCode)
[docs]class ProcDef(base.Structure, base.RestrictionMixin): """An embedded procedure. Embedded procedures are python code fragments with some interface defined by their type. They can occur at various places (which is called procedure application generically), e.g., as row generators in grammars, as apply-s in rowmakers, or as SQL phrase makers in condDescs. They consist of the actual actual code and, optionally, definitions like the namespace setup, configuration parameters, or a documentation. The procedure applications compile into python functions with special global namespaces. The signatures of the functions are determined by the type attribute. ProcDefs are referred to by procedure applications using their id. """ name_ = "procDef" _code = base.UnicodeAttribute("code", default=base.NotGiven, copyable=True, description="A python function body.") _setup = base.StructListAttribute("setups", ProcSetup, description="Setup of the namespace the function will run in", copyable=True) _doc = base.UnicodeAttribute("doc", default="", description= "Human-readable docs for this proc (may be interpreted as restructured" " text).", copyable=True) _type = base.EnumeratedUnicodeAttribute("type", default=None, description= "The type of the procedure definition. The procedure applications" " will in general require certain types of definitions.", validValues=["t_t", "apply", "rowfilter", "sourceFields", "mixinProc", "phraseMaker", "descriptorGenerator", "dataFunction", "dataFormatter", "metaMaker", "regTest", "iterator", "pargetter"], copyable=True, strip=True) _deprecated = base.UnicodeAttribute("deprecated", default=None, copyable=True, description="A deprecation message. This will" " be shown if this procDef is being compiled.") _original = base.OriginalAttribute()
[docs] def getCode(self): """returns the body code indented with two spaces. """ if self.code is base.NotGiven: return "" else: try: return utils.fixIndentation(self.code, " ", governingLine=1) except base.LiteralParseError as ex: ex.pos = self.getSourcePosition() raise
[docs] @functools.lru_cache(1) def getSetupPars(self): """returns all parameters used by setup items, where lexically later items override earlier items of the same name. """ return unionByKey(*[s.pars for s in self.setups])
[docs] def getLateSetupCode(self, boundNames): return "\n".join(s.getLateCode(boundNames) for s in self.setups)
[docs] def getParSetupCode(self, boundNames): return "\n".join(s.getParCode(boundNames) for s in self.setups)
[docs] def getBodySetupCode(self, boundNames): return "\n".join(s.getBodyCode() for s in self.setups)
[docs]class ProcApp(ProcDef): """An abstract base for procedure applications. Deriving classes need to provide: - a requiredType attribute specifying what ProcDefs can be applied. - a formalArgs attribute containing a (python) formal argument list - of course, a name_ for XML purposes. They can, in addition, give a class attribute additionalNamesForProcs, which is a dictionary that is joined into the global namespace during procedure compilation. """ _procDef = base.ReferenceAttribute("procDef", forceType=ProcDef, default=base.NotGiven, description="Reference to the procedure" " definition to apply", copyable=True) _bindings = base.StructListAttribute("bindings", description= "Values for parameters of the procedure definition", childFactory=Binding, copyable=True) _name = base.UnicodeAttribute("name", default=base.NotGiven, description="A name of the proc. ProcApps compute their (python)" " names to be somewhat random strings. Set a name manually to" " receive more easily decipherable error messages. If you do that," " you have to care about name clashes yourself, though.", strip=True) requiredType = None additionalNamesForProcs = {}
[docs] def validate(self): if self.procDef and self.procDef.type and self.requiredType: if self.procDef.type!=self.requiredType: raise base.StructureError("The procDef %s has type %s, but" " here %s procDefs are required."%(self.procDef.id, self.procDef.type, self.requiredType)) if self.procDef: if self.procDef.deprecated: if self.getSourcePosition()!="<internally built>": # for now, don't warn about these; they typically # originate when copying/adapting cores and will just # confuse operators procId = "unnamed procApp" if self.name: procId = "procApp %s"%self.name base.ui.notifyWarning("%s, %s: %s"%( self.getSourcePosition(), procId, utils.fixIndentation(self.procDef.deprecated, ""))) super().validate() self._ensureParsBound()
[docs] def completeElement(self, ctx): super().completeElement(ctx) if self.name is base.NotGiven: # make up a name from self's id self.name = ("proc%x"%id(self)).replace("-", "")
[docs] @functools.lru_cache(1) def getSetupPars(self): """returns the setup parameters for the proc app, where procDef parameters may be overridden by self's parameters. """ allSetups = [] if self.procDef is not base.NotGiven: allSetups.extend(self.procDef.setups) allSetups.extend(self.setups) return unionByKey(*[s.pars for s in allSetups])
def _ensureParsBound(self): """raises an error if non-defaulted pars of procDef are not filled by the bindings. """ bindNames = dict((b.key, b) for b in self.bindings) for p in self.getSetupPars(): if p.alias and p.alias in bindNames: binding = bindNames.pop(p.alias) binding.key = p.key if p.key in bindNames: raise base.StructureError("Both canonical name and alias bound:" " %s, %s"%(p.key, p.alias)) else: bindNames[p.key] = binding if not p.isDefaulted(): if not p.key in bindNames: raise base.StructureError("Parameter %s is not defaulted in" " %s and thus must be bound."%(p.key, self.name)) if p.key in bindNames: bindNames.pop(p.key) if bindNames: raise base.StructureError("May not bind non-existing parameter(s)" " %s."%(", ".join(bindNames)))
[docs] def onElementComplete(self): super().onElementComplete() self._boundNames = dict((b.key, b.content_) for b in self.bindings)
def _combineWithProcDef(self, methodName, boundNames): # A slightly tricky helper method for the implementation of get*SetupCode: # this combines the results of calling methodName on a procDef # (where applicable) with calling it on ProcDef for self. parts = [] if self.procDef is not base.NotGiven: parts.append(getattr(self.procDef, methodName)(boundNames)) parts.append(getattr(ProcDef, methodName)(self, boundNames)) return "\n".join(parts)
[docs] def getLateSetupCode(self, boundNames): return self._combineWithProcDef("getLateSetupCode", boundNames)
[docs] def getParSetupCode(self, boundNames): return self._combineWithProcDef("getParSetupCode", boundNames)
[docs] def getBodySetupCode(self, boundNames): return self._combineWithProcDef("getBodySetupCode", boundNames)
[docs] def getSetupCode(self): code = "\n".join(( self.getParSetupCode(self._boundNames), self.getBodySetupCode(self._boundNames))) if "\\" in code: code = self.parent.expand(code) return code
def _getFunctionDefinition(self, mainSource): """returns mainSource in a function definition with proper signature including setup of late code. """ parts = [self.getLateSetupCode(self._boundNames)] parts.append(mainSource) body = "\n".join(parts) if not body.strip(): body = " pass" return "def %s(%s):\n%s"%(self.name, self.formalArgs, body)
[docs] def getFuncCode(self): """returns a function definition for this proc application. This includes bindings of late parameters. Locally defined code overrides code defined in a procDef. """ mainCode = "" if self.code is base.NotGiven: if self.procDef is not base.NotGiven: mainCode = self.procDef.getCode() else: mainCode = self.getCode() code = self._getFunctionDefinition(mainCode) if "\\" in code: code = self.parent.expand(code) return code
def _compileForParent(self, parent): """helps compile. """ # go get the RD for parent; it's always handy in this kind # of code curEl = parent while not hasattr(curEl, "rd"): if curEl.parent: curEl = curEl.parent else: break try: rd = curEl.rd except AttributeError: # maybe an unrooted element rd = None return rmkfuncs.makeProc( self.name, self.getFuncCode(), self.getSetupCode(), parent, rd=rd, procDef=self, **self.additionalNamesForProcs)
[docs] def breakCircles(self): # overridden to undo additional memoization ProcDef.breakCircles(self) utils.forgetMemoized(self)
[docs] def compile(self, parent=None): """returns a callable for this procedure application. You can pass a different parent; it will then be used to expand macros. If you do not give it, the embedding structure will be used. """ if parent is None: parent = self.parent return utils.memoizeOn(parent, self, self._compileForParent, parent)