Source code for gavo.base.structure

"""
Representation of structured data deserializable from XML.

We want all the managed attribute stuff since the main user input comes
from resource descriptors, and we want relatively strong input validation
here.  Also, lots of fancy copying and crazy cross-referencing is
going on in our resource definitions, so we want a certain amount of
rigorous structure.  Finally, a monolithic parser for that stuff
becomes *really* huge and tedious, so I want to keep the XML parsing
information in the constructed objects themselves.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import utils
from gavo.base import attrdef
from gavo.base import common
from gavo.base import parsecontext


[docs]def sortAttrs(attrSeq): """evaluates the before attributes on the AttributeDefs in attrsSeq and returns a sequence satisfying them. It returns a reference to attrSeq for convenience. """ beforeGraph, prependMeta = [], False for att in attrSeq: if att.before: beforeGraph.append((att.name_, att.before)) if att.name_=="meta_": prependMeta = True if beforeGraph: attDict = dict((a.name_, a) for a in attrSeq) sortedNames = utils.topoSort(beforeGraph) # Hack: metadata always comes first if prependMeta: sortedNames[:0] = ["meta_"] sortedAtts = [attDict[n] for n in sortedNames] attrSeq = sortedAtts+list(set(attrSeq)-set(sortedAtts)) return attrSeq
[docs]class StructType(type): """A metaclass for the representation of structured data. StructType classes with this will be called structures within the DC software. Structures do quite a bit of the managed attribute nonsense to meaningfully catch crazy user input. Basically, you give a Structure class attributes (preferably with underscores in front) specifying the attributes the instances should have and how they should be handled. Structures must be constructed with a parent (for the root element, this is None). All other arguments should be keyword arguments. If given, they have to refer to existing attributes, and their values will directly give the the values of the attribute (i.e., parsed values). Structures should always inherit from StructBase below and arrange for its constructor to be called, since, e.g., default processing happens there. Structures have a managedAttrs dictionary containing names and attrdef.AttributeDef objects for the defined attributes. """ def __init__(cls, name, bases, dict): type.__init__(cls, name, bases, dict) cls._collectManagedAttrs() cls._insertAttrMethods() def _collectManagedAttrs(cls): """collects a dictionary of managed attributes in managedAttrs. """ managedAttrs, completedCallbacks, attrSeq = {}, [], [] for name in dir(cls): if not hasattr(cls, name): continue val = getattr(cls, name) if isinstance(val, attrdef.AttributeDef): managedAttrs[val.name_] = val attrSeq.append(val) if hasattr(val, "xmlName_"): managedAttrs[val.xmlName_] = val if val.aliases: for alias in val.aliases: managedAttrs[alias] = val cls.attrSeq = sortAttrs(attrSeq) cls.managedAttrs = managedAttrs cls.completedCallbacks = completedCallbacks def _insertAttrMethods(cls): """adds methods defined by cls's managedAttrs for the parent to cls. """ for val in set(cls.managedAttrs.values()): for name, meth in val.iterParentMethods(): setattr(cls, name, meth)
[docs]class DataContent(attrdef.UnicodeAttribute): """A magic attribute that allows character content to be added to a structure. You can configure it with all the arguments available for UnicodeAttribute. Since parsers may call characters with an empty string for empty elements, the empty string will not be fed (i.e., the default will be preserved). This makes setting an empty string as an element content impossible (you could use DataContent with strip=True, though), but that's probably not a problem. """ typeDesc_ = "string" def __init__(self, default="", description="Undocumented", **kwargs): attrdef.UnicodeAttribute.__init__(self, "content_", default=default, description=description, **kwargs)
[docs] def feed(self, ctx, instance, value): if value=='': return return attrdef.UnicodeAttribute.feed(self, ctx, instance, value)
[docs] def makeUserDoc(self): return ("Character content of the element (defaulting to %s) -- %s"%( repr(self.default_), self.description_))
[docs]class StructureBase(object, metaclass=StructType): """is a base class for all structures. You must arrange for calling its constructor from classes inheriting this. The constructor receives a parent (another structure, or None) and keyword arguments containing values for actual attributes (which will be set without any intervening consultation of the AttributeDef). The attribute definitions talking about structures let you set parent to None when constructing default values; they will then insert the actual parent. """ name_ = attrdef.Undefined _id = parsecontext.IdAttribute("id", description="Node identity for referencing") # the following is managed by setPosition/getSourcePosition __fName = __lineNumber = None def __init__(self, parent, **kwargs): self.setParent(parent) # set defaults for val in self.attrSeq: try: if not hasattr(self, val.name_): # don't clobber properties # set up by attributes. setattr(self, val.name_, val.default_) except AttributeError: # default on property given raise utils.logOldExc(common.StructureError( "%s attributes on %s have builtin defaults only."%( val.name_, self.name_))) # set keyword arguments for name, val in kwargs.items(): if name in self.managedAttrs: if not hasattr(self.managedAttrs[name], "computed_"): self.managedAttrs[name].feedObject(self, val) else: raise common.StructureError("%s objects have no attribute %s"%( self.__class__.__name__, name)) def _nop(self, *args, **kwargs): pass
[docs] def setParent(self, parent): """sets the parent of a Structure. This is a method mainly to let individual elements override the behaviour. """ self.parent = parent super().setParent(parent)
[docs] def setPosition(self, fName, lineNumber): """should be called by parsers to what file at what line the serialisation came from. """ self.__fName, self.__lineNumber = fName, lineNumber
[docs] def getSourcePosition(self): """returns a string representation of where the struct was parsed from. """ if self.__fName is None: return "<internally built>" else: return "%s, line %s"%(self.__fName, self.__lineNumber)
[docs] def getAttributes(self, attDefsFrom=None): """returns a dict of the current attributes, suitable for making a shallow copy of self. Struct attributes will not be reparented, so there are limits to what you can do with such shallow copies. """ if attDefsFrom is None: attrs = set(self.managedAttrs.values()) else: attrs = set(attDefsFrom.managedAttrs.values()) try: return dict([(att.name_, getattr(self, att.name_)) for att in attrs]) except AttributeError as msg: raise common.logOldExc(common.StructureError( "Attempt to copy from invalid source: %s"%str(msg)))
[docs] def getCopyableAttributes(self, ignoreKeys=set(), ctx=None, newParent=None): """returns a dictionary mapping attribute names to copyable children. ignoreKeys can be a set or dict of additional attribute names to ignore. The children are orphan deep copies. """ return dict((att.name_, att.getCopy(self, newParent, ctx)) for att in self.attrSeq if att.copyable and att.name_ not in ignoreKeys)
[docs] def change(self, **kwargs): """returns a copy of self with all attributes in kwargs overridden with the passed values. """ parent = kwargs.pop("parent_", self.parent) runExits, ctx = False, kwargs.pop("ctx", None) if ctx is None: runExits, ctx = True, parsecontext.ParseContext() newInstance = self.__class__(parent) for attName, attValue in self.getCopyableAttributes( kwargs, ctx, newInstance).items(): newInstance.feedObject(attName, attValue) for attName, attValue in kwargs.items(): newInstance.feedObject(attName, attValue) newInstance.finishElement(ctx) if runExits: ctx.runExitFuncs(newInstance) return newInstance
[docs] def copy(self, parent, ctx=None): """returns a deep copy of self, reparented to parent. This is a shallow wrapper around change, present for backward compatibility. """ return self.change(parent_=parent, ctx=ctx)
[docs] def adopt(self, struct): struct.setParent(self) return struct
[docs] def iterChildren(self): """iterates over structure children of self. To make this work, attributes containing structs must define iterChildren methods (and the others must not). """ for att in self.attrSeq: if hasattr(att, "iterChildren"): for c in att.iterChildren(self): yield c
[docs] @classmethod def fromStructure(cls, newParent, oldStructure): consArgs = dict([(att.name_, getattr(oldStructure, att.name_)) for att in oldStructure.attrSeq]) return cls(newParent, **consArgs)
[docs] def breakCircles(self): """removes the parent attributes from all child structures recursively. The struct will probably be broken after this, but this is sometimes necessary to help the python garbage collector. In case you're asking: parent cannot be a weak reference with the current parse architecture, as it usually is the only reference to the embedding object. Yes, we should probably change that. """ for child in self.iterChildren(): # we don't want to touch structs that aren't our children if hasattr(child, "parent") and child.parent is self: if hasattr(child, "breakCircles"): child.breakCircles() delattr(child, "parent")
[docs]class ParseableStructure(StructureBase, common.StructCallbacks, common.Parser): """is a base class for Structures parseable from EventProcessors (and thus XML). This is still abstract in that you need at least a name_ attribute. But it knows how to be fed from a parser, plus you have feed and feedObject methods that look up the attribute names and call the methods on the respective attribute definitions. """ _pristine = True def __init__(self, parent, **kwargs): StructureBase.__init__(self, parent, **kwargs)
[docs] def finishElement(self, ctx): return self
[docs] def getAttribute(self, name): """Returns an attribute instance from name. This function will raise a StructureError if no matching attribute definition is found. """ if name in self.managedAttrs: return self.managedAttrs[name] if name=="content_": raise common.StructureError("%s elements must not have character data" " content."%(self.name_)) raise common.StructureError( "%s elements have no %s attributes or children."%(self.name_, name))
[docs] def end_(self, ctx, name, value): try: self.finishElement(ctx) except common.Replace as ex: if ex.newName is not None: name = ex.newName if ex.newOb.id is not None: ctx.registerId(ex.newOb.id, ex.newOb) self.parent.feedObject(name, ex.newOb) except common.Ignore: pass else: if self.parent: self.parent.feedObject(name, self) # del self.feedEvent (at some point we might selectively reclaim parsers) return self.parent
[docs] def value_(self, ctx, name, value): attDef = self.getAttribute(name) try: attDef.feed(ctx, self, value) except common.Replace as ex: return ex.newOb self._pristine = False return self
[docs] def start_(self, ctx, name, value): attDef = self.getAttribute(name) if hasattr(attDef, "create"): return attDef.create(self, ctx, name) else: return name
[docs] def feed(self, name, literal, ctx=None): """feeds the literal to the attribute name. If you do not have a proper parse context ctx, so there may be restrictions on what literals can be fed. """ self.managedAttrs[name].feed(ctx, self, literal)
[docs] def feedObject(self, name, ob): """feeds the object ob to the attribute name. """ self.managedAttrs[name].feedObject(self, ob)
[docs] def iterEvents(self): """yields an event sequence that transfers the copyable information from self to something receiving the events. If something is not copyable, it is ignored (i.e., keeps its default on the target object). """ for att in self.attrSeq: if not att.copyable: continue if hasattr(att, "iterEvents"): for ev in att.iterEvents(self): yield ev else: val = getattr(self, att.name_) if val!=att.default_: yield ("value", att.name_, att.unparse(val))
[docs] def feedFrom(self, other, ctx=None, suppress=set()): """feeds parsed objects from another structure. This only works if the other structure is a of the same or a superclass of self. """ from gavo.base import xmlstruct if ctx is None: ctx = parsecontext.ParseContext() evProc = xmlstruct.EventProcessor(None, ctx) evProc.setRoot(self) for ev in other.iterEvents(): evProc.feed(*ev)
[docs]class Structure(ParseableStructure): """is the base class for user-defined structures. It will do some basic validation and will call hooks to complete elements and compute computed attributes, based on ParseableStructure's finishElement hook. Also, it supports onParentComplete callbacks; this works by checking if any managedAttr has a onParentComplete method and calling it with the current value of that attribute if necessary. """
[docs] def callCompletedCallbacks(self): for attName, attType in self.managedAttrs.items(): if hasattr(attType, "onParentComplete"): attVal = getattr(self, attType.name_) if attVal!=attType.default_: attType.onParentComplete(attVal)
[docs] def finishElement(self, ctx=None): self.completeElement(ctx) self.validate() self.onElementComplete() self.callCompletedCallbacks() return self
[docs] def validate(self): for val in set(self.managedAttrs.values()): if getattr(self, val.name_) is attrdef.Undefined: raise common.StructureError("You must set %s on %s elements"%( val.name_, self.name_)) if hasattr(val, "validate"): val.validate(self) super().validate()
[docs] def onElementComplete(self): super().onElementComplete()
[docs] def completeElement(self, ctx): super().completeElement(ctx)
[docs]class RestrictionMixin(common.StructCallbacks): """A mixin for structure classes not allowed in untrusted RDs. """
[docs] def completeElement(self, ctx): if getattr(ctx, "restricted", False): raise common.RestrictedElement(self.name_) super().completeElement(ctx)
[docs]def makeStruct(structClass, **kwargs): """creates a parentless instance of structClass with ``**kwargs``. You can pass in a ``parent_`` kwarg to force a parent, and a ``ctx_`` if you need a parse context. This is the preferred way to create struct instances in DaCHS, as it will cause the sequence of completers and validators run. Use it like this:: MS(rscdef.Column, name="ra", type="double precision) """ ctx = kwargs.pop("ctx_", None) parent = kwargs.pop("parent_", None) return structClass(parent, **kwargs).finishElement(ctx)