Source code for gavo.base.structure

"""
Representation of structured data deserializable from XML.

We want all the managed attribute stuff since the main user input comes
from resource descriptors, and we want relatively strong input validation
here.  Also, lots of fancy copying and crazy cross-referencing is
going on in our resource definitions, so we want a certain amount of
rigorous structure.  Finally, a monolithic parser for that stuff
becomes *really* huge and tedious, so I want to keep the XML parsing
information in the constructed objects themselves.
"""

#c Copyright 2008-2025, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import utils
from gavo.base import attrdef
from gavo.base import common
from gavo.base import parsecontext

from gavo.utils.dachstypes import (cast, Any, Callable, Dict, Iterator,
	List, Optional, Set, StructParseContext, StructParserValue,
	Tuple, Type, Union)


[docs]def sortAttrs(attrSeq: List[attrdef.AttributeDef] ) -> List[attrdef.AttributeDef]: """evaluates the ``before`` attributes on the AttributeDefs in attrsSeq and returns a sequence satisfying them. It returns a reference to attrSeq for convenience. """ beforeGraph, prependMeta = [], False for att in attrSeq: if att.before: beforeGraph.append((att.name_, att.before)) if att.name_=="meta_": prependMeta = True if beforeGraph: attDict = dict((a.name_, a) for a in attrSeq) sortedNames = utils.topoSort(beforeGraph) # Hack: metadata always comes first if prependMeta: sortedNames[:0] = ["meta_"] sortedAtts = [attDict[n] for n in sortedNames] attrSeq = sortedAtts+list(set(attrSeq)-set(sortedAtts)) return attrSeq
[docs]def buildstructure(cls: Type["StructureBase"]) -> Type["StructureBase"]: """A class decorator for DaCHS structures. This should eventually do what the StructType metaclass does at this point, because with that, mypy annotations can be fiddled in (which is mypy-impossible with metaclasses). For now, it's a no-op that will go into code as we add type annotations. """ cls.STRUCT_SENTINEL = None # type: ignore # dynamic attribute creation return cls
[docs]class StructType(type): """A metaclass for the representation of structured data. StructType classes with this will be called structures within the DC software. Structures do quite a bit of the managed attribute nonsense to meaningfully catch crazy user input. Basically, you give a Structure class attributes (preferably with underscores in front) specifying the attributes the instances should have and how they should be handled. Structures must be constructed with a parent (for the root element, this is None). All other arguments should be keyword arguments. If given, they have to refer to existing attributes, and their values will directly give the the values of the attribute (i.e., parsed values). Structures should always inherit from StructBase below and arrange for its constructor to be called, since, e.g., default processing happens there. Structures have a managedAttrs dictionary containing names and attrdef.AttributeDef objects for the defined attributes. TODO: We should probably move all this into the buildstructure class decorator once we are done with type annotations. None of what is here couldn't be done just as well in a class decorator (which didn't exist when this was originally written). """ def __init__(cls, name: str, bases: Tuple[type, ...], dict: Dict ) -> None: type.__init__(cls, name, bases, dict) cls._collectManagedAttrs() cls._insertAttrMethods() def _collectManagedAttrs(cls) -> None: """collects a dictionary of managed attributes in managedAttrs. """ managedAttrs: Dict[str, attrdef.AttributeDef] = {} completedCallbacks: List[Callable[[], None]] = [] attrSeq: List[attrdef.AttributeDef] = [] for name in dir(cls): if not hasattr(cls, name): continue val = getattr(cls, name) if isinstance(val, attrdef.AttributeDef): managedAttrs[val.name_] = val attrSeq.append(val) if val.aliases: for alias in val.aliases: managedAttrs[alias] = val cls.attrSeq = sortAttrs(attrSeq) cls.managedAttrs = managedAttrs cls.completedCallbacks = completedCallbacks def _insertAttrMethods(cls) -> None: """adds methods defined by cls's managedAttrs for the parent to cls. """ for val in set(cls.managedAttrs.values()): for name, meth in val.iterParentMethods(): setattr(cls, name, meth)
[docs]class DataContent(attrdef.UnicodeAttribute): """A magic attribute that allows character content to be added to a structure. You can configure it with all the arguments available for UnicodeAttribute. Since parsers may call characters with an empty string for empty elements, the empty string will not be fed (i.e., the default will be preserved). This makes setting an empty string as an element content impossible (you could use DataContent with strip=True, though), but that's probably not a problem. """ typeDesc_ = "string" def __init__(self, default: Union[utils.NotGivenType, str] = "", description: str = "Undocumented", **kwargs: Any) -> None: attrdef.UnicodeAttribute.__init__(self, "content_", default=default, description=description, **kwargs)
[docs] def feed(self, ctx: StructParseContext, instance: "Structure", value: str) -> None: if value=='': return attrdef.UnicodeAttribute.feed(self, ctx, instance, value)
[docs] def makeUserDoc(self) -> str: return ("Character content of the element (defaulting to %s) -- %s"%( repr(self.default_), self.description_))
[docs]@buildstructure class StructureBase(common.StructCallbacks, metaclass=StructType): """A base class for all structures (i.e., RD elements). You must arrange for calling its constructor from classes inheriting this. The constructor receives a parent (another structure, or None) and keyword arguments containing values for actual attributes (which will be set without any intervening consultation of the AttributeDef). The attribute definitions talking about structures let you set parent to None when constructing default values; they will then insert the actual parent. """ name_: Union[str, Type[utils.Undefined]] = attrdef.Undefined _id = parsecontext.IdAttribute("id", description="Node identity for referencing") # the following is managed by setPosition/getSourcePosition __fName: Optional[str] = None __lineNumber: Optional[Union[int, str]] = None def __init__(self, parent: Optional["ParseableStructure"], **kwargs: Any) -> None: self.parent: Optional["ParseableStructure"] = None if parent is not None: self.setParent(parent) # set defaults for val in self.attrSeq: try: if not hasattr(self, val.name_): # don't clobber properties # set up by attributes. setattr(self, val.name_, val.default_) except AttributeError: # default on property given raise utils.logOldExc(common.StructureError( f"{val.name_} attributes on {self.name_} have builtin defaults only")) # set keyword arguments for name, val in kwargs.items(): if name in self.managedAttrs: if not hasattr(self.managedAttrs[name], "computed_"): self.managedAttrs[name].feedObject(self, val) else: raise common.StructureError( "{} objects have no attribute {}".format( self.__class__.__name__, name)) def _nop(self, *args, **kwargs): pass
[docs] def setParent(self, parent: "ParseableStructure") -> None: """sets the parent of a Structure. This is a method mainly to let individual elements override the behaviour. """ self.parent = parent super().setParent(parent)
[docs] def setPosition(self, fName: Optional[str], lineNumber: Union[int, str]) -> None: """should be called by parsers to what file at what line the serialisation came from. """ self.__fName, self.__lineNumber = fName, lineNumber
[docs] def getSourcePosition(self) -> str: """returns a string representation of where the struct was parsed from. """ if self.__fName is None: return "<internally built>" else: return "{}, line {}".format(self.__fName, self.__lineNumber)
[docs] def getAttributes(self, attDefsFrom: Optional[StructType] = None ) -> Dict[str, Any]: """returns a dict of the current attributes, suitable for making a shallow copy of self. Struct attributes will not be reparented, so there are limits to what you can do with such shallow copies. """ if attDefsFrom is None: attrs = set(self.managedAttrs.values()) else: attrs = set(attDefsFrom.managedAttrs.values()) try: return dict([(att.name_, getattr(self, att.name_)) for att in attrs]) except AttributeError as msg: raise utils.logOldExc(common.StructureError( "Attempt to copy from invalid source: %s"%str(msg)))
[docs] def getCopyableAttributes(self, ignoreKeys: Set[str] = set(), ctx: Optional[StructParseContext] = None, newParent: Optional["StructureBase"] = None) -> Dict[str, Any]: """returns a dictionary mapping attribute names to copyable children. ignoreKeys can be a set or dict of additional attribute names to ignore. The children are orphan deep copies. """ return dict((att.name_, att.getCopy(self, newParent, ctx)) for att in self.attrSeq if att.copyable and att.name_ not in ignoreKeys)
[docs] def adopt(self, struct: "ParseableStructure") -> "ParseableStructure": """reparents a structure to self. This returns the structure for convenience. This fails if there is a previous parent set, as structures don't actually support unparenting (and I have found no reason for them to do so). """ if struct.parent: # TODO: we probably should warn this, or clone the struct, or whatever. # Simply reparenting the struct feels wrong. But it's worked so # far, so let's ignore this for now. pass struct.setParent(cast("ParseableStructure", self)) return struct
[docs] def iterChildren(self) -> Iterator["Structure"]: """iterates over structure children of self. To make this work, attributes containing structs must define iterChildren methods (and the others must not). """ for att in self.attrSeq: if hasattr(att, "iterChildren"): for c in att.iterChildren(self): yield c
[docs] @classmethod def fromStructure(cls, newParent: "Structure", oldStructure: "Structure" ) -> "StructureBase": consArgs = dict([(att.name_, getattr(oldStructure, att.name_)) for att in oldStructure.attrSeq]) return cls(newParent, **consArgs)
[docs] def breakCircles(self) -> None: """removes the parent attributes from all child structures recursively. The struct will probably be broken after this, but this is sometimes necessary to help the python garbage collector. In case you're asking: parent cannot be a weak reference with the current parse architecture, as it usually is the only reference to the embedding object. Yes, we should probably change that. """ for child in self.iterChildren(): # we don't want to touch structs that aren't our children if hasattr(child, "parent") and child.parent is self: if hasattr(child, "breakCircles"): child.breakCircles() delattr(child, "parent")
[docs]class ParseableStructure(StructureBase, common.Parser): """A base class for Structures parseable from EventProcessors (and thus XML). This is still abstract in that you need at least a name_ attribute. But it knows how to be fed from a parser, plus you have feed and feedObject methods that look up the attribute names and call the methods on the respective attribute definitions. """ _pristine = True
[docs] def finishElement(self, ctx: StructParseContext): return self
[docs] def getAttribute(self, name: str) -> Any: """Returns an attribute instance from name. This function will raise a StructureError if no matching attribute definition is found. """ if name in self.managedAttrs: return self.managedAttrs[name] if name=="content_": raise common.StructureError( f"{self.name_} elements must not have character data content.") raise common.StructureError( f"{self.name_} elements have no {name} attributes or children.")
[docs] def end_(self, ctx: StructParseContext, name: str, value: StructParserValue) -> Optional[common.Parser]: try: self.finishElement(ctx) except common.Replace as ex: if ex.newName is not None: name = ex.newName if ex.newOb.id is not None: ctx.registerId(ex.newOb.id, ex.newOb) if self.parent is not None: cast("ParseableStructure", self.parent).feedObject(name, ex.newOb) except common.Ignore: pass else: if self.parent: cast("ParseableStructure", self.parent).feedObject(name, self) # del self.feedEvent (at some point we might selectively reclaim parsers) return self.parent
[docs] def value_(self, ctx: StructParseContext, name: str, value: StructParserValue) -> common.Parser: attDef = self.getAttribute(name) try: attDef.feed(ctx, self, value) except common.Replace as ex: return ex.newOb self._pristine = False return cast(common.Parser, self)
[docs] def start_(self, ctx: StructParseContext, name: str, value: StructParserValue) -> Optional[Union[common.Parser, str]]: attDef = self.getAttribute(name) if hasattr(attDef, "create"): return attDef.create(self, ctx, name) else: return name
[docs] def feed(self, name: str, literal: str, ctx: Optional[StructParseContext] = None) -> None: """feeds the literal to the attribute name. If you do not have a proper parse context ctx, so there may be restrictions on what literals can be fed. """ self.managedAttrs[name].feed(ctx, self, literal)
[docs] def feedObject(self, name: str, ob: Any) -> None: """feeds the object ob to the attribute name. """ self.managedAttrs[name].feedObject(self, ob)
[docs] def iterEvents(self) -> Iterator[common.ParserEvent]: """yields an event sequence that transfers the copyable information from self to something receiving the events. If something is not copyable, it is ignored (i.e., keeps its default on the target object). """ for att in self.attrSeq: if not att.copyable: continue if hasattr(att, "iterEvents"): yield from att.iterEvents(self) else: val = getattr(self, att.name_) if att.name_=="value": import pdb;pdb.Pdb(nosigint=True).set_trace() if val!=att.default_: yield common.ParserEvent("value", att.name_, att.unparse(val))
[docs] def change(self, **kwargs: Any) -> "StructureBase": """returns a copy of self with all attributes in kwargs overridden with the passed values. """ parent = kwargs.pop("parent_", self.parent) runExits, ctx = False, kwargs.pop("ctx", None) if ctx is None: runExits, ctx = True, parsecontext.ParseContext() newInstance = self.__class__(parent) for attName, attValue in self.getCopyableAttributes( set(kwargs), ctx, newInstance).items(): newInstance.feedObject(attName, attValue) for attName, attValue in kwargs.items(): newInstance.feedObject(attName, attValue) newInstance.finishElement(ctx) if runExits: ctx.runExitFuncs(newInstance) return newInstance
[docs] def copy(self, parent: "StructureBase", ctx: Optional[StructParseContext] = None) -> "StructureBase": """returns a deep copy of self, reparented to parent. This is a shallow wrapper around change, present for backward compatibility. """ return self.change(parent_=parent, ctx=ctx)
[docs] def feedFrom(self, other: "Structure", ctx: Optional[StructParseContext] = None, suppress: Set = set()) -> None: """feeds parsed objects from another structure. This only works if the other structure is a of the same or a superclass of self. """ from gavo.base import xmlstruct if ctx is None: ctx = parsecontext.ParseContext() evProc = xmlstruct.EventProcessor(self, ctx) evProc.setParser(cast(Structure, self)) for ev in other.iterEvents(): evProc.feed(*ev)
# This is so we don't sling around Nones in finishElement. # It should only be passed to instance's completeElement # methods, and they shouldn't do crazy things in here. # Still, we make sure that no ids are registered. class _NullParseContextType(parsecontext.ParseContext): """for base.structure-internal use only. """ def registerId(self, elId: str, value: "Structure", silentOverwrite: bool = False) -> None: # we're fake, so don't store anything pass _NullParseContext = _NullParseContextType()
[docs]@buildstructure class Structure(ParseableStructure): """the base class for all RD elements. It will do some basic validation and will call hooks to complete elements and compute computed attributes, based on ParseableStructure's finishElement hook. Also, it supports onParentComplete callbacks; this works by checking if any managedAttr has a onParentComplete method and calling it with the current value of that attribute if necessary. """
[docs] def callCompletedCallbacks(self) -> None: for attName, attType in self.managedAttrs.items(): if hasattr(attType, "onParentComplete"): attVal = getattr(self, attType.name_) if attVal!=attType.default_: attType.onParentComplete(attVal)
[docs] def finishElement(self, ctx: StructParseContext = _NullParseContext ) -> "Structure": self.completeElement(ctx) self.validate() self.onElementComplete() self.callCompletedCallbacks() return self
[docs] def validate(self) -> None: for val in set(self.managedAttrs.values()): if getattr(self, val.name_) is attrdef.Undefined: raise common.StructureError( f"You must set {val.name_} on {self.name_} elements") if hasattr(val, "validate"): val.validate(self) super().validate()
[docs] def onElementComplete(self) -> None: super().onElementComplete()
[docs] def completeElement(self, ctx: StructParseContext) -> None: super().completeElement(ctx)
[docs]class RestrictionMixin(common.StructCallbacks): """A mixin for structure classes not allowed in untrusted RDs. """
[docs] def completeElement(self, ctx: StructParseContext) -> None: if getattr(ctx, "restricted", False): # self.name_ could be undefined below, but then that's our # least problem. raise common.RestrictedElement(self.name_) # type: ignore super().completeElement(ctx)
[docs]def makeStruct(structClass: StructType, **kwargs: Any) -> Structure: """creates a parentless instance of structClass with ``**kwargs``. You can pass in a ``parent_`` kwarg to force a parent, and a ``ctx_`` if you need a parse context. This is the preferred way to create struct instances in DaCHS, as it will cause the sequence of completers and validators run. Use it like this:: MS(rscdef.Column, name="ra", type="double precision) """ ctx = kwargs.pop("ctx_", None) parent = kwargs.pop("parent_", None) return structClass(parent, **kwargs).finishElement(ctx)