Source code for gavo.rscdef.rmkdef

"""
Definition of rowmakers.

rowmakers are objects that take a dictionary of some kind and emit
a row suitable for inclusion into a table.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import bisect
import fnmatch
import re
import sys
import traceback

from gavo import base
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import column
from gavo.rscdef import procdef
from gavo.rscdef import rmkfuncs
from gavo.rscdef import rowtriggers


__docformat__ = "restructuredtext en"


[docs]class Error(base.Error): pass
[docs]class MappedExpression(base.Structure): """a base class for map and var. You must give a destDict class attribute to make these work. """ destDict = None restrictedMode = False _dest = column.ColumnNameAttribute("key", default=base.Undefined, description="Name of the column the value is to end up in.", copyable=True, strip=True, aliases=["dest", "name"]) _src = base.UnicodeAttribute("source", default=None, description="Source key name to convert to column value (either a grammar" " key or a var).", copyable=True, strip=True, aliases=["src"]) _nullExcs = base.UnicodeAttribute("nullExcs", default=base.NotGiven, description="Exceptions that should be caught and" " cause the value to be NULL, separated by commas.") _expr = base.DataContent( description="A python expression giving the value for key.", copyable=True, strip=True) _nullExpr = base.UnicodeAttribute("nullExpr", default=base.NotGiven, description="A python expression for a value that is mapped to" " NULL (None). Equality is checked after building the value, so" " this expression has to be of the column type. Use map with" " the parseWithNull function to catch null values before type" " conversion.")
[docs] def completeElement(self, ctx): self.restrictedMode = getattr(ctx, "restricted", False) if self.restrictedMode and ( self.content_ or self.nullExpr or self.nullValue): raise base.RestrictedElement("map", hint="In restricted mode, only" " maps with a source attribute are allowed; nullExpr or nullValue" " are out, too, since they can be used to inject raw code.") if not self.content_ and not self.source: self.source = self.key if self.content_ and "\\" in self.content_: self.content_ = self.parent.expand(self.content_)
[docs] def validate(self): """checks that code content is a parseable python expression and that the destination exists in the tableDef """ super().validate() if (self.content_ and self.source) or not (self.content_ or self.source): raise base.StructureError("Map must have exactly one of source attribute" " or element content") if self.nullExpr is not base.NotGiven: utils.ensureExpression(self.nullExpr) if self.content_: utils.ensureExpression(common.replaceProcDefAt(self.content_), self.name_) if self.nullExcs is not base.NotGiven: utils.ensureExpression(self.nullExcs, "%s.nullExcs"%(self.name_))
[docs] def getAutoMapper(self): """returns an expression literal for turning what is self.source to a value suitable for self.key. This must be defined for concrete classes derived from this. """ raise NotImplementedError("No automatic mappers defined here")
[docs] def getCode(self, columns): """returns python source code for this map. """ code = [] if isinstance(self.key, utils.QuotedName): destIndex = '"%s"'%(self.key.name.replace('"', '\\"')) else: destIndex = '"%s"'%self.key if self.content_: code.append('%s[%s] = %s'%(self.destDict, destIndex, self.content_)) else: code.append('%s[%s] = %s'%(self.destDict, destIndex, self.getAutoMapper(columns))) if self.nullExpr is not base.NotGiven: code.append('\nif %s[%s]==%s: %s[%s] = None'%( self.destDict, destIndex, self.nullExpr, self.destDict, destIndex)) code = "".join(code) if self.nullExcs is not base.NotGiven: code = 'try:\n%s\nexcept (%s): %s[%s] = None'%( re.sub("(?m)^", " ", code), self.nullExcs, self.destDict, destIndex) return code
[docs]class MapRule(MappedExpression): """A mapping rule. To specify the source of a mapping, you can either - grab a value from what's emitted by the grammar or defined using var via the source attribute. The value given for source is converted to a python value and stored. - or give a python expression in the body. In that case, no further type conversion will be attempted. If neither source or a body is given, map uses the key attribute as its source attribute. The map rule generates a key/value pair in the result record. """ name_ = "map" destDict = "result"
[docs] def getAutoMapper(self, columns): """returns an expression to automatically map self.source to a column in the destination table. """ colDef = columns.getColumnByName(self.key) try: return base.sqltypeToPythonCode(colDef.type)%( 'vars["{}"]'.format( self.source.replace("\\", r"\\").replace('"', '\\"'))) except base.ConversionError: raise base.ui.logOldExc(base.LiteralParseError("map", colDef.type, hint="Auto-mapping to %s is impossible since" " no default map for %s is known"%(self.key, colDef.type)))
[docs]class VarDef(MappedExpression): """A definition of a rowmaker variable. It consists of a name and a python expression, including function calls. The variables are entered into the input row coming from the grammar. var elements are evaluated before apply elements, in the sequence they are in the RD. You can refer to keys defined by vars already evaluated in the usual @key manner. """ name_ = "var" destDict = "vars"
[docs] def getAutoMapper(self, columns): """returns var[self.source]. Having source with var doesn't make a lot of sense, but it's a nifty way to introduce None-s for missing keys if one wants. And it should do *something*. """ return 'vars["{}"]'.format( self.source.replace("\\", r"\\").replace('"', '\\"'))
[docs]class ApplyDef(procdef.ProcApp): """A code fragment to manipulate the result row (and possibly more). Apply elements allow embedding python code in rowmakers. The current input fields from the grammar (including the rowmaker's vars) are available in the vars dictionary and can be changed there. You can also add new keys. You can add new keys for shipping out in the result dictionary. The active rowmaker is available as parent. It is also used to expand macros. The table that the rowmaker feeds to can be accessed as targetTable. You probably only want to change meta information here (e.g., warnings or infos). As always in procApps, you can get the embedding RD as rd; this is useful to, e.g., resolve references using rd.getByRD, and specify resdir-relative file names using rd.getAbsPath. """ name_ = "apply" requiredType = "apply" formalArgs = "vars, result, targetTable, _self"
[docs] def getFuncCode(self): return common.replaceProcDefAt(procdef.ProcApp.getFuncCode(self))
[docs]class RowmakerMacroMixin(base.StandardMacroMixin): """A collection of macros available to rowmakers. NOTE: All macros should return only one single physical python line, or they will mess up the calculation of what constructs caused errors. """
[docs] def macro_standardPubDID(self): r"""returns the "standard publisher DID" for the current product. The publisher dataset identifier (PubDID) is important in protocols like SSAP and obscore. If you use this macro, the PubDID will be your authority, the path component ~, and the current value of @prodtblAccref. It thus will only work where products#define (or a replacement) is in action. If it isn't, a normal function call getStandardPubDID(\\inputRelativePath) would be an obvious alternative. You *can* of course define your PubDIDs in a different way. """ return ('getStandardPubDID(vars["prodtblAccref"])')
[docs] def macro_dlMetaURI(self, dlId): """returns a link to the datalink document for the current product. This assumes you're assigning standard pubDIDs (see also standardPubDID, which is used by this). dlId is the XML id of the datalink service, which is supposed to be in the sameRD as the rowmaker. """ return ('"%%s?ID=%%s"%%(' 'rd_.getById(%s).getURL("dlmeta", absolute=True),'%repr(dlId)+ 'urllib.parse.quote(getStandardPubDID(vars["prodtblAccref"])))')
[docs] def macro_fullPath(self): """returns an expression expanding to the full path of the current input file. """ return 'vars["parser_"].sourceToken'
[docs] def macro_inputRelativePath(self, liberalChars="True"): """see grammars.common.GrammarMacroMixin """ return ('getInputsRelativePath(' 'vars["parser_"].sourceToken, liberalChars=%s)' )%base.parseBooleanLiteral(liberalChars)
[docs] def macro_rowsProcessed(self): """returns an expression giving the number of records already delivered by the grammar. """ return 'vars["parser_"].recNo'
[docs] def macro_rowsMade(self): """returns an expression giving the number of records already returned by this row maker. This number excludes failed and skipped rows. """ return '_self.rowsMade'
[docs] def macro_property(self, propName): """returns an expression giving the value of the property propName on the current DD. """ return 'curDD_.getProperty("%s")'%propName
[docs] def macro_sourceDate(self): """returns an expression giving the timestamp of the current source. This is a timestamp of the modification date; use dateTimeToJdn or dateTimeToMJD to turn this into JD or MJD (which is usually preferred in database tables). See also the sourceCDate macro. """ return ('datetime.datetime.utcfromtimestamp(' 'os.path.getmtime(vars["parser_"].sourceToken))')
[docs] def macro_sourceCDate(self): """returns an expression giving the timestamp for the create date of the current source. Use dateTimeToJdn or dateTimeToMJD to turn this into JD or MJD (which is usually preferred in database tables). See also the sourceDate macro. """ return ('datetime.datetime.utcfromtimestamp(' 'os.path.getctime(vars["parser_"].sourceToken))')
[docs] def macro_srcstem(self): """returns python code for the stem of the source file currently parsed in a rowmaker. Example: if you're currently parsing /tmp/foo.bar.gz, the stem is foo. """ return ('getFileStem(vars["parser_"].sourceToken)')
[docs] def macro_lastSourceElements(self, numElements): """returns an expression calling rmkfuncs.lastSourceElements on the current input path. """ return 'lastSourceElements(vars["parser_"].sourceToken, %d)'%( int(numElements))
[docs] def macro_rootlessPath(self): """returns an expression giving the current source's path with the resource descriptor's root removed. """ return 'utils.getRelativePath(vars["parser_"].sourceToken, rd_.resdir)'
[docs] def macro_inputSize(self): """returns an expression giving the size of the current source. """ return 'os.path.getsize(vars["parser_"].sourceToken)'
[docs] def macro_docField(self, name): """returns an expression giving the value of the column name in the document row. """ return '_parser.getParameters()[fieldName]'
[docs] def macro_qName(self): """returns the qName of the table we are currently parsing into. """ return "tableDef_.getQName()"
[docs]class RowmakerDef(base.Structure, RowmakerMacroMixin): """A definition of the mapping between grammar input and finished rows ready for shipout. Rowmakers consist of variables, procedures and mappings. They result in a python callable doing the mapping. In python code within rowmaker elements, you can use a large number of functions. See `Functions available for row makers`_ in the reference documentation. RowmakerDefs double as macro packages for the expansion of various macros. The standard macros will need to be quoted, the rowmaker macros above yield python expressions. Within map and var bodies as well as late apply pars and apply bodies, you can refer to the grammar input as vars["name"] or, shorter @name. To add output keys, use map or, in apply bodies, add keys to the ``result`` dictionary. """ name_ = "rowmaker" _maps = base.StructListAttribute("maps", childFactory=MapRule, description="Mapping rules.", copyable=True) _vars = base.StructListAttribute("vars", childFactory=VarDef, description="Definitions of intermediate variables.", copyable=True) _apps = base.StructListAttribute("apps", childFactory=ApplyDef, description="Procedure applications.", copyable=True) _rd = common.RDAttribute() _idmaps = base.StringListAttribute("idmaps", description="List of" ' column names that are just "mapped through" (like map with key' " only); you can use shell patterns to select multiple columns at once.", copyable=True) _simplemaps = base.IdMapAttribute("simplemaps", description= "Abbreviated notation for <map source>; each pair is destination:source", copyable=True) _ignoreOn = base.StructAttribute("ignoreOn", default=None, childFactory=rowtriggers.IgnoreOn, description="Conditions on the" " input record coming from the grammar to cause the input" " record to be dropped by the rowmaker, i.e., for this specific" " table. If you need to drop a row for all tables being fed," " use a trigger on the grammar.", copyable=True) _original = base.OriginalAttribute()
[docs] @classmethod def makeIdentityFromTable(cls, table, **kwargs): """returns a rowmaker that just maps input names to column names. """ if "id" not in kwargs: kwargs["id"] = "autogenerated rowmaker for table %s"%table.id return base.makeStruct(cls, idmaps=[c.key for c in table], **kwargs)
[docs] @classmethod def makeTransparentFromTable(cls, table, **kwargs): """returns a rowmaker that maps input names to column names without touching them. This is for crazy cases in which the source actually provides pre-parsed data that any treatment would actually ruin. """ if "id" not in kwargs: kwargs["id"] = "autogenerated rowmaker for table %s"%table.id return base.makeStruct(cls, maps=[ base.makeStruct(MapRule, key=c.name, content_="vars[%s]"%repr(c.name)) for c in table], **kwargs)
[docs] def completeElement(self, ctx): if self.simplemaps: for k,v in self.simplemaps.items(): nullExcs = base.NotGiven if v.startswith("@"): v = v[1:] nullExcs = "KeyError," self.feedObject("maps", base.makeStruct(MapRule, key=k, source=v, nullExcs=nullExcs)) super().completeElement(ctx)
def _getSourceFromColset(self, columns): """returns the source code for a mapper to a column set. """ lineMap, line = {}, 0 source = [] def appendToSource(srcLine, line, lineMarker): source.append(srcLine) line += 1 lineMap[line] = lineMarker line += source[-1].count("\n") return line if self.ignoreOn: line = appendToSource("if checkTrigger(vars):\n" " raise IgnoreThisRow(vars)", line, "Checking ignore") for v in self.vars: line = appendToSource(v.getCode(columns), line, "assigning "+str(v.key)) for a in self.apps: line = appendToSource( "%s(vars, result, targetTable, _self)"%a.name, line, "executing "+a.name) for m in self.maps: line = appendToSource(m.getCode(columns), line, "building "+str(m.key)) return "\n".join(source), lineMap def _getSource(self, tableDef): """returns the source code for a mapper to tableDef's columns. """ return self._getSourceFromColset(tableDef.columns) def _getGlobals(self, tableDef): globals = {} for a in self.apps: globals[a.name] = a.compile() if self.ignoreOn: globals["checkTrigger"] = self.ignoreOn globals["tableDef_"] = tableDef globals["rd_"] = tableDef.rd globals["curDD_"] = tableDef.parent return globals def _resolveIdmaps(self, columns): """adds mappings for self's idmap within column set. """ existingMaps = set(m.key for m in self.maps) baseNames = [c.key for c in columns] for colName in self.idmaps: matching = fnmatch.filter(baseNames, colName) if not matching: raise base.NotFoundError(colName, "columns matching", "unknown") for dest in matching: if dest not in existingMaps: self.maps.append(MapRule(self, key=dest).finishElement(None)) self.idmaps = [] def _checkTable(self, columns, id): """raises a LiteralParseError if we try to map to non-existing columns. """ for map in self.maps: try: columns.getColumnByName(map.key) except KeyError: raise base.ui.logOldExc(base.LiteralParseError(self.name_, map.key, "Cannot map to '%s' since it does not exist in %s"%( map.key, id))) def _buildForTable(self, tableDef): """returns a RowmakerDef with everything expanded and checked for tableDef. This may raise LiteralParseErrors if self's output is incompatible with tableDef. """ res = self.copyShallowly() try: res._resolveIdmaps(tableDef.columns) res._checkTable(tableDef.columns, tableDef.id) except base.NotFoundError as ex: ex.within = "table %s's columns"%tableDef.id raise return res def _realCompileForTableDef(self, tableDef): """helps compileForTableDef. """ rmk = self._buildForTable(tableDef) source, lineMap = rmk._getSource(tableDef) globals = rmk._getGlobals(tableDef) return Rowmaker(common.replaceProcDefAt(source), self.id or "<rowmaker without id>", globals, tableDef.getDefaults(), lineMap)
[docs] def compileForTableDef(self, tableDef): """returns a function receiving a dictionary of raw values and returning a row ready for adding to a tableDef'd table. To do this, we first make a rowmaker instance with idmaps resolved and then check if the rowmaker result and the table structure are compatible. """ return utils.memoizeOn(tableDef, self, self._realCompileForTableDef, tableDef)
[docs] def copyShallowly(self): return base.makeStruct(self.__class__, maps=self.maps[:], vars=self.vars[:], idmaps=self.idmaps[:], apps=self.apps[:], ignoreOn=self.ignoreOn)
[docs]class ParmakerDef(RowmakerDef): name_ = "parmaker" def _buildForTable(self, tableDef): res = self.copyShallowly() try: res._resolveIdmaps(tableDef.params) res._checkTable(tableDef.params, tableDef.id) except base.NotFoundError as ex: ex.within = "table %s's params"%tableDef.id raise return res def _getSource(self, tableDef): """returns the source code for a mapper to tableDef's columns. """ return self._getSourceFromColset(tableDef.params)
identityRowmaker = base.makeStruct(RowmakerDef, idmaps="*")
[docs]class Rowmaker(object): """A callable that arranges for the mapping of key/value pairs to other key/value pairs. Within DaCHS, Rowmakers generate database rows (and parameter dictionaries) from the output of grammars. They are constructed with the source of the mapping function, a dictionary of globals the function should see, a dictionary of defaults, giving keys to be inserted into the incoming rowdict before the mapping function is called, and a map of line numbers to names handled in that line. It is called with a dictionary of locals for the functions (i.e., usually the result of a grammar iterRows). """ def __init__(self, source, name, globals, defaults, lineMap): try: self.code = compile(source, "generated mapper code", "exec") except SyntaxError as msg: raise base.ui.logOldExc( base.BadCode(source, "rowmaker", msg)) self.source, self.name = source, name globals.update(rmkfuncs.__dict__) self.globals, self.defaults = globals, defaults self.keySet = set(self.defaults) self.lineMap = sorted(lineMap.items()) self.rowsMade = 0 def _guessExSourceName(self, tb): """returns an educated guess as to which mapping should have caused that traceback in tb. This is done by inspecting the second-topmost stackframe. It must hold the generated line that, possibly indirectly, caused the exception. This line should be in the lineMap generated by RowmakerDef._getSource. """ if tb.tb_next: excLine = tb.tb_next.tb_lineno base.ui.notifyDebug( "Here's the traceback:\n%s"%"".join(traceback.format_tb(tb))) else: # toplevel failure, internal return "in toplevel (internal failure)" destInd = min(len(self.lineMap)-1, bisect.bisect_left(self.lineMap, (excLine, ""))) # If we're between lineMap entries, the one before the guessed one # is the one we want if self.lineMap[destInd][0]>excLine and destInd: destInd -= 1 return self.lineMap[destInd][1] def _guessError(self, ex, rowdict, tb): """tries to shoehorn a ValidationError out of ex. """ base.ui.notifyDebug("Rowmaker failed. Exception below. Failing source" " is:\n%s"%self.source) destName = self._guessExSourceName(tb) if isinstance(ex, KeyError): msg = "Key %s not found in a mapping."%str(ex) hint = ("This probably means that your grammar did not yield the" " field asked for. Alternatively, bugs in procs might also" " cause this.") else: msg = str(ex) hint = ("This is a failure in more-or-less user-provided code." " If you run again with the global --debug flag, the source of" " the failing code should be in the logs/dcInfos (but make" " sure it's the source the error is reported for; with procs," " this might not be the case).") raise base.ui.logOldExc(base.ValidationError("While %s in %s: %s"%( destName, self.name, msg), destName.split()[-1], rowdict, hint=hint)) def __call__(self, vars, table): try: locals = { "vars": vars, "result": {}, "_self": self, "targetTable": table } missingKeys = self.keySet-set(vars) for k in missingKeys: vars[k] = self.defaults[k] exec(self.code, self.globals, locals) self.rowsMade += 1 return locals["result"] except base.ExecutiveAction: # pass these on raise except base.ValidationError: # hopefully downstream knows better than we raise except Exception as ex: self._guessError(ex, locals["vars"], sys.exc_info()[2])