Source code for gavo.base.parsecontext

"""
ParseContexts for parsing into structures.

A Context is a scratchpad for struct parsing.  It always provides an idmap, but
you're free to insert additional attributes.

Based on this, we provide some attribute definitions.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib

from gavo import utils
from gavo.base import attrdef
from gavo.base import caches
from gavo.base import common


[docs]def assertType(id, ob, forceType): """raises a StructureError if forceType is not None and ob is not of type forceType, returns ob otherwise. """ if forceType: if not isinstance(ob, forceType): raise common.StructureError("Reference to '%s' yielded object of type" " %s, but expected a(n) %s instance."%(id, ob.__class__.__name__, forceType.__name__)) return ob
[docs]def resolveCrossId(id, forceType=None, **kwargs): """resolves ``id``, where id is of the form ``rdId#id``. ``forceType``, if non-None must be a DaCHS struct type (e.g., rscdef.Table); a ``StructureError`` will be raised if the reference resolves to something else than an instance of that type. ``id`` can also be a simple rd id. ``kwargs`` lets you pass additional keyword arguments to the ``getRD`` calls that may be triggered by this. """ try: rdId, rest = id.split("#") except ValueError: if "#" in id: raise common.LiteralParseError("id", id, hint="There must be at most" " one hash sign ('#') in cross ids, separating the rd identifier" " from the rd-internal id") rdId, rest = id, None try: srcRD = caches.getRD(rdId, **kwargs) except common.RDNotFound as ex: ex.hint = ( "I was trying to resolve the reference %s; note that DaCHS only" " uses RDs residing below inputsDir and ignores all others." " If there's an RD that DaCHS insists isn't there, that's" " probably the reason."%id) raise if rest is None: return assertType(id, srcRD, forceType) else: return resolveId(srcRD, rest, forceType=forceType)
[docs]def resolveNameBased(container, id, forceType=None): """Tries to find a thing with name id within container. If container defines a method getElementForName, it will be called; it must either return some element with this name or raise a NotFoundError. If no such method exists, the function iterates over container until it finds an element with el.name==id. If no such element exists, it again raises a NotFoundError. The function raises a NotFoundError when no such thing exists. """ if hasattr(container, "getElementForName"): return container.getElementForName(id) ob = None try: for ob in container: if hasattr(ob, "name") and ob.name==id: return assertType(id, ob, forceType) except TypeError: if ob is None: raise utils.logOldExc(common.NotFoundError(id, "Element with name", "container %s"%repr(container), hint="The container, %s, is not iterable"%repr(container))) else: raise utils.logOldExc(common.NotFoundError(id, "Element with name", "container %s"%repr(container), hint="Element %s is of type %s and thus unsuitable" " for name path"%(ob.name, type(ob)))) raise common.NotFoundError(id, "Element with name", "container %s"%container.id)
[docs]def resolveComplexId(ctx, id, forceType=None): """resolves a dotted id. See resolveId. """ try: pId, name = id.split(".") except ValueError: raise utils.logOldExc(common.LiteralParseError("id", id, hint="A complex reference (parent.name) is expected here")) container = ctx.getById(pId) return resolveNameBased(container, name, forceType)
def _resolveOnNamepath(ctx, id, instance): if hasattr(instance, "resolveName"): return instance.resolveName(ctx, id) if (instance and instance.parent and hasattr(instance.parent, "resolveName")): return instance.parent.resolveName(ctx, id) raise common.NotFoundError(id, "Element with id or name", "name path")
[docs]def resolveId(ctx, id, instance=None, forceType=None): """tries to resolve id in context. ctx is some object having a getById method; this could be an RD or a parse context. The rules for id are as follows: (#) if id has a # in it, split it and take the first part to be an RD id, the second and id built according to the rest of this spec. (#) if id has a dot in it, split at the first dot to get a pair of id and name. Iterate over the element with id, and look for something with a "name" attribute valued name. If this fails, raise a NotFoundError. (#) if instance is not None and has a resolveName method or has a parent, and that parent has a resolveName method, pass id to it. If it does not raise a NotFoundError, return the result. This is for parents with a rscdef.NamePathAttribute. (#) ask the ParseContext ctx's getById method to resolve id, not catching the NotFoundError this will raise if the id is not known. """ if "#" in id: return resolveCrossId(id, forceType) if ctx is None: raise common.StructureError( "Cannot intra-reference (%s) when parsing without a context"%id) if "." in id: return resolveComplexId(ctx, id, forceType) srcOb = None if instance: try: srcOb = _resolveOnNamepath(ctx, id, instance) except common.NotFoundError: # no such named element, try element with id pass if srcOb is None and ctx is not None: srcOb = ctx.getById(id, forceType) return assertType(id, srcOb, forceType)
[docs]class IdAttribute(attrdef.UnicodeAttribute): """is an attribute that registers its parent in the context's id map in addition to setting its id attribute. """
[docs] def feed(self, ctx, parent, literal): attrdef.UnicodeAttribute.feed(self, ctx, parent, literal) if ctx is not None: ctx.registerId(parent.id, parent, ctx.replayLevel>0) parent.qualifiedId = ctx.getQualifiedId(literal)
[docs] def getCopy(self, parent, newParent, ctx): return None # ids may not be copied
[docs] def makeUserDoc(self): return None # don't mention it in docs -- all structures have it
[docs]class OriginalAttribute(attrdef.AtomicAttribute): """is an attribute that resolves an item copies over the managed attributes from the referenced item. The references may be anything resolveId can cope with. You can pass a forceType argument to make sure only references to specific types are allowable. In general, this will be the class itself of a base class. If you don't do this, you'll probably get weird AttributeErrors for certain inputs. To work reliably, these attributes have to be known to the XML parser so it makes sure they are processed first. This currently works by name, and "original" is reserved for this purpose. Other names will raise an AssertionError right now. As a safety mechanism, OriginalAttribute checks if it is replacing a "pristine" object, i.e. one that has not had events fed to it. """ computed_ = True typeDesc_ = "id reference" def __init__(self, name="original", description="An id of an element" " to base the current one on. This provides a simple inheritance" " method. The general rules for advanced referencing in RDs apply.", forceType=None, **kwargs): assert name=='original' attrdef.AtomicAttribute.__init__(self, name, None, description, **kwargs) self.forceType = forceType
[docs] def feedObject(self, instance, original, ctx=None): if not instance._pristine: raise common.StructureError("Original must be applied before modifying" " the destination structure.", hint="You should normally use" " original only as attribute. If you insist on having it as" " an element, it must be the first one and all other structure" " members must be set through elements, too") instance._originalObject = original instance.feedFrom(original, ctx)
[docs] def feed(self, ctx, instance, literal): self.feedObject(instance, resolveId(ctx, literal, instance, self.forceType), ctx)
class _ReferenceParser(common.Parser): """A helper class for the ReferenceAttribute. """ def __init__(self, refAttr, parent, baseName): self.refAttr, self.parent = refAttr, parent self.child = common.NotGiven self.baseName = baseName def _ensureChild(self, ctx): """creates an instance of the new, immediate child to be filled and leaves it in self.child. """ if self.child is common.NotGiven: self.child = self.refAttr._makeChild(self.baseName, self.parent) ctx.setPositionOn(self.child) def start_(self, ctx, name, value): # start event: we have an immediate child. Create it and feed this # event to the newly created child. self._ensureChild(ctx) return self.child.feedEvent(ctx, "start", name, value) def end_(self, ctx, name, value): if self.child is common.NotGiven: # empty element; make a child self._ensureChild(ctx) if self.child is not None: # immediate child was given: self.child.finishElement(ctx) self.parent.feedObject(name, self.child) return self.parent def value_(self, ctx, name, value): # value event: If it's a content_, it's a reference, else it's an # attribute on a child of ours. if name=="content_": if self.child: raise common.StructureError("Content received on ReferenceParser" " although a child is already there.", hint="You should" " not see this. Complain fiercely.") self.refAttr.feed(ctx, self.parent, value) self.child = None return self else: self._ensureChild(ctx) return self.child.feedEvent(ctx, "value", name, value)
[docs]class ReferenceAttribute(attrdef.AtomicAttribute): """An attribute keeping a reference to some other structure This is a bit messy since the value referred to keeps its original parent, so self.attr.parent!=self for these attributes. This is ok for many applications, but it will certainly not work for, e.g. tables (roughly, it's always trouble when an attribute value's implementation refers to self.parent; this is particularly true for structures having an RDAttribute). So, before adding a reference attribute, think first whether it wouldn't be wiser to have the real thing and rather use active tags. """ typeDesc_ = "id reference" def __init__(self, name="ref", default=attrdef.Undefined, description="Uncodumented", forceType=None, **kwargs): attrdef.AtomicAttribute.__init__(self, name, default, description, **kwargs) self.forceType = forceType def _getForceType(self, instance): """returns self.forceType unless it is Recursive, in which case instance's type is returned. """ if self.forceType is attrdef.Recursive: return instance.__class__ else: return self.forceType
[docs] def unparse(self, value): if value is None: # ref attribute was empty return None if hasattr(value, "qualifiedId"): return value.qualifiedId elif isinstance(value, str): return value else: # See HACK notice in feed setattr(value, "unparse-approved-anonymous", True) return value
# Since ReferenceAttributes can now contain immediate elements, # just returning an id (as happens by default) may not be enough # for serialization -- the immediate object is nowhere else. # We could fix that using something like this, at the expense # of unrolling all the elements. We don't do much DC structure # serialization, and thus I believe it's just not worth it. # def iterEvents(self, instance): # # This needs a special iterEvents to actually return embedded # # structures if necessary # val = getattr(instance, self.name_) # if val==self.default_: # return # # if hasattr(val, "_RefAttrImmediate"): # yield ("start", self.name_, None) # for ev in val.iterEvents(): # yield ev # yield ("end", self.name_, None) # else: # yield ("value", self.name_, self.unparse(val))
[docs] def feed(self, ctx, instance, literal): if literal is None: # ref attribute empty during a copy return # do nothing, since nothing was ref'd in original # HACK: when copying around structures, it's possible that anonymous # structures can be fed in here. We *really* don't want to make # up ids for them. Thus, we allow them out in unparse and in here # again. if hasattr(literal, "unparse-approved-anonymous"): self.feedObject(instance, literal) else: self.feedObject(instance, resolveId(ctx, literal, instance, self._getForceType(instance)))
def _makeChild(self, name, parent): """returns a new element of the appropriate type. This method raises a StructureError if that type is not known. Within ReferenceAttribute, the type is given by forceType. """ if self.forceType is None: raise common.StructureError("Only references allowed for %s, but" " an immediate object was found"%self.name_, hint="This means that" " you tried to replace a reference to an element with" " the element itself. This is only allowed if the reference" " forces a type, which is not the case here.") child = self._getForceType(parent)(parent) # leave a sentinel in the child that will later let us # iterEvents not the id but the struct itself. child._RefAttrImmediate = True return child
[docs] def create(self, structure, ctx, name): # we don't know at this point whether or not the next event will be # an open (-> create a new instance of self.forceType) or a # value (-> resolve). Thus, create an intermediate parser that # does the right thing. return _ReferenceParser(self, structure, name)
[docs]class ReferenceListAttribute(ReferenceAttribute): """A list of references. These can come as distinct elements -- <ref>a</ref><ref>b</ref> -- or as a comma-joined string with ignored whitespace -- ref="a, //services#b, x.y", or in a mixture between the two. """ typeDesc_ = "list of id references (comma separated or in distinct elements)" def __init__(self, name, **kwargs): if kwargs.get("default") is not None: raise common.StructureError("ReferenceListAttributes cannot have" " defaults") kwargs["default"] = attrdef.Computed ReferenceAttribute.__init__(self, name, **kwargs) @property def default_(self): return []
[docs] def feedObject(self, instance, value): if isinstance(value, list): for item in value: self.feedObject(instance, item) else: getattr(instance, self.name_).append(value) self.doCallbacks(instance, value)
[docs] def feed(self, ctx, instance, literal): # split literal up if there's commas if literal is None: # see ReferenceAttribute.feed return if "," in literal: for s in literal.split(","): ReferenceAttribute.feed(self, ctx, instance, s.strip()) else: ReferenceAttribute.feed(self, ctx, instance, literal)
[docs] def unparse(self, value): # Hack to avoid to have to figure out globally ok ids; see # comments in ReferenceAttribute class BlessedList(list): pass setattr(BlessedList, "unparse-approved-anonymous", True) return BlessedList(value)
[docs]class ParseContext(object): """A scratchpad for any kind of data parsers want to pass to feed methods. These objects are available to the feed methods as their first objects. If restricted is True, embedded code must raise an error. You should set an eventSource using the setter provided. This is the iterparse instance the events are coming from (or something else that has a pos attribute returning the current position). You can register exit functions to do some "global" cleanup. Parsers should call runExitFuncs right before they return the results; this arranges for these functions to be called. The signature of an exit function is exitfunc(rootStruct, parseContext) -> whatever. Then there's injected info. This is used to read "dynamic" metadata for RDs from the database up front and then fill the respective data where appropriate. Use inject to fill this (for RDs, do things like that in the rscdef.rdinj module) and getInjected to get data out again. """ def __init__(self, restricted=False, forRD=None): self.idmap = {} self.restricted = restricted self.forRD = forRD self.eventSource = None self.injectedData = {} self.exitFuncs = [] # if non-0, we're replaying events self.replayLevel = 0
[docs] def setEventSource(self, evSource): self.eventSource = evSource
[docs] def addExitFunc(self, callable): self.exitFuncs.append(callable)
[docs] def inject(self, key, data): """sets injected data for key. This will silently overwrite any pre-existing data. """ self.injectedData[key] = data
[docs] def getInjected(self, key, default=common.NotGiven): """get injected data for key. This will raise a KeyError if no data for key has been injected (or default, if given). For keys available here while parsing RDs, see rscdef.rdinj, look for ctx.inject. """ try: return self.injectedData[key] except KeyError: if default is not common.NotGiven: return default raise
[docs] @contextlib.contextmanager def replaying(self): """is called by active tags to indicate they're replaying events. The main effect right now is to suppress diagnostics for overwritten ids. But let's see what else we might want to use it for. The API to see if we're replaying: replayLevel>0. """ self.replayLevel += 1 try: yield finally: self.replayLevel -= 1
@property def pos(self): """returns a token stringifying into a position guess. """ if self.eventSource is None: return "(while parsing sourceless)" else: return self.eventSource.pos
[docs] def setPositionOn(self, struct): """calls a struct's setPosition method to tell it where it came from. """ if isinstance(self.pos, str): struct.setPosition(self.pos, -1) else: struct.setPosition(self.pos.fName, self.pos.line)
[docs] def getQualifiedId(self, id): """returns an id including the current RD's id, if known, otherwise id itself. """ if self.forRD: return "%s#%s"%(self.forRD, id) return id
[docs] def registerId(self, elId, value, silentOverwrite=False): """enters a value in the id map. We allow overriding in id. That should not happen while parsing an XML document because of their uniqueness requirement, but might come in handy for programmatic manipulations. We'll still emit an Info in that case (and may need to find a way to suppress it). """ if elId in self.idmap: if not silentOverwrite: utils.sendUIEvent("Info", "Element with id %s overwritten."%elId) self.idmap[elId] = value
[docs] def getById(self, id, forceType=None): """returns the object last registered for id. You probably want to use resolveId; getById does no namePath or resource descriptor resolution. """ if id not in self.idmap: raise common.NotFoundError(id, "Element with id", "parse context", hint="Elements referenced must occur lexically (i.e., within the" " input file) before the reference. If this actually gives" " you trouble, contact the authors. Usually, though, this" " error just means you mistyped a name.") res = self.idmap[id] return assertType(id, res, forceType)
[docs] def resolveId(self, id, instance=None, forceType=None): """returns the object referred to by the complex id. See the resolveId function. """ return resolveId(self, id, instance, forceType)
[docs] def runExitFuncs(self, root): for func in self.exitFuncs: func(root, self)
[docs]def getTableDefForTable(connection, tableName): """returns a TableDef object for a SQL table name. connection needs to be TableConnection or something with higher privileges. This really has little to do with resolving identifiers, but this module already has getRDs and similar, so it seemed the least unnatural place. """ if not "." in tableName: tableName = "public."+tableName res = list(connection.queryToDicts( "select sourcerd, tablename from dc.tablemeta where" " lower(tableName)=%(tableName)s", {"tableName": tableName.lower()})) if len(res)>1: raise common.ReportableError("More than one entry for table %s in" " dc.tablemeta!"%tableName, hint="This is a severe internal error and really should not happen." " Please report this bug.") elif not res: raise common.NotFoundError(tableName, "table", "published tables") row = res[0] return caches.getRD(row["sourcerd"] ).getById(row["tablename"].split(".")[-1])