Source code for gavo.base.xmlstruct

"""
Code to parse structures from XML sources.

The purpose of much of the mess here is to symmetrize XML attributes
and values.  Basically, we want start, value, end events whether
or not a piece of data comes in an element with a certain tag name or
via a named attribute.
"""

#c Copyright 2008-2025, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import io
import pathlib
import re

from gavo import utils
from gavo.base import activetags
from gavo.base import common
from gavo.base import parsecontext

from gavo.utils.dachstypes import (cast, TYPE_CHECKING,
	Dict, Iterator, Optional, Structure,
	StructParseContext, StructParserValue, TextIO, Type, Union)

if TYPE_CHECKING:
	from gavo.base.structure import ParseableStructure
	_StructureArg = Union[ParseableStructure, Type[ParseableStructure]]


ALL_WHITESPACE = re.compile(r"\s*$")


[docs]class EventProcessor(common.Parser): """A dispatcher for parse events to structures. It is constructed with the root structure of the result tree, either as a type or as an instance. After that, events can be fed to the feed method that makes sure they are routed to the proper object. """ # The event processor distinguishes between parsing atoms (just one # value) and structured data using the next attribute. If it is not # None, the next value coming in will be turned to a "value" event # on the current parser. If it is None, we hand through the event # to the current structure. debug = False def __init__(self, rootStruct: "_StructureArg", ctx: StructParseContext) -> None: self.curParser: common.Parser = self self.next: Union[common.Parser, str, None] = None self.ctx = ctx self.result = self._instantiate(rootStruct) @staticmethod def _instantiate(rootStruct: "_StructureArg") -> Structure: # in practice, all struct types in DaCHS are Structure-s and # none are just ParseableStructure-s. That's the only # reason the cast is safe; this asks for refactoring... if isinstance(rootStruct, type): return cast(Structure, rootStruct(None)) else: return cast(Structure, rootStruct) def _feedToAtom(self, type: common.ParserEventType, name: str, value: StructParserValue) -> None: if type=='start': raise common.StructureError("%s elements cannot have %s children"%( self.next, name)) elif type=='value' or type=="parsedvalue": # For atoms, you can only set content_ -- they have no attributes. if name!="content_": raise common.StructureError("%s is atomic and thus has" " no attribute %s."%(self.next, name)) # the cast in the next line is because I hope that logically, # this case will not be reached when self.next is None. self.curParser.feedEvent( self.ctx, common.ParserEventType.value, cast(str, self.next), value) elif type=='end': self.next = None def _feedToStructured(self, type: common.ParserEventType, name: str, value: StructParserValue) -> None: next = self.curParser.feedEvent(self.ctx, type, name, value) if isinstance(next, str): self.next = next else: self.curParser = cast(common.Parser, next)
[docs] def feed(self, type: common.ParserEventType, name: str, value: StructParserValue = None) -> None: """feeds an event. This is the main entry point for user calls. """ # Special handling for active tags: They may occur everywhere and # thus are not not parsed by the element parsers but by us. # Active tags may define ACTIVE_NOEXPAND to undo that behaviour # (i.e., see active tag events themselves). if (type=="start" and activetags.isActive(name) and not hasattr(self.curParser, "ACTIVE_NOEXPAND")): self.curParser = activetags.getActiveTag(name)(self.curParser) return if self.next is None: self._feedToStructured(type, name, value) else: self._feedToAtom(type, name, value)
[docs] def feedEvent(self, ctx: StructParseContext, evType: common.ParserEventType, name: str, value: StructParserValue) -> Optional[Union[common.Parser, str]]: """dispatches an event to the root structure. Do not call this yourself unless you know what you're doing. The method to feed "real" events to is feed. """ if name!=self.result.name_: raise common.StructureError("Expected root element %s, found %s"%( self.result.name_, name)) if evType=="start": self.result.idmap = ctx.idmap # type: ignore # dynamic attribute ctx.setPositionOn(self.result) return self.result else: raise common.StructureError("Bad document structure")
[docs] def setParser(self, parser: Structure) -> None: """sets a structure to receive parse events. In particular, this bypasses any checks that the event stream coming is is actually destined for root. Use this for replay-type things (feedFrom, active tags) exclusively. """ self.curParser = parser self.result.idmap = self.ctx.idmap # type: ignore # dynamic attribute
def _synthesizeAttributeEvents( evProc: EventProcessor, context: StructParseContext, attrs: Dict[str, str]) -> None: """generates value events for the attributes in attrs. """ # original attributes must be fed first since they will usually # yield a different target object original = attrs.pop("original", None) if original: evProc.feed(common.ParserEventType.value, "original", original) # mixins must be fed last as they might depend on stuff set # in other attributes mixin = attrs.pop("mixin", None) for key, val in attrs.items(): evProc.feed(common.ParserEventType.value, key, val) if mixin: evProc.feed(common.ParserEventType.value, "mixin", mixin)
[docs]def feedTo( rootStruct: "_StructureArg", eventSource: Iterator, context: StructParseContext, feedInto: bool = False) -> Structure: """feeds events from eventSource to rootStruct. A new event processor is used for feeding. No context exit functions are run. The processed root structure is returned. if feedInto is true, the event creating the root structure is not expected (TODO: this is crap; fix it so that this is always the case when rootStruct is an instance). """ evProc = EventProcessor(rootStruct, context) if feedInto: evProc.setParser(evProc.result) buf = [] try: for type, name, payload in eventSource: # buffer data if type=="data": buf.append(payload) continue else: if buf: res = "".join(buf) if not ALL_WHITESPACE.match(res): evProc.feed(common.ParserEventType.value, "content_", res) buf = [] # "normal" event feed evProc.feed(type, name, payload) # start event: Synthesize value events for attributes. if type=="start" and payload: _synthesizeAttributeEvents(evProc, context, payload) payload = None except Exception as ex: if (not getattr(ex, "posInMsg", False) and getattr(ex, "pos", None) is None): # only add pos when the message string does not already have it. ex.pos = eventSource.pos # type: ignore # dynamic attribute raise return evProc.result
[docs]def parseFromStream( rootStruct: "_StructureArg", inputStream: TextIO, context: Optional[StructParseContext] = None) -> Structure: """parses a tree rooted in rootStruct from some file-like object inputStream. It returns the root element of the resulting tree. If rootStruct is a type subclass, it will be instantiated to create a root element, if it is an instance, this instance will be the root. """ eventSource = utils.iterparse(inputStream) if context is None: context = parsecontext.ParseContext() context.setEventSource(eventSource) res = feedTo(rootStruct, eventSource, context) context.runExitFuncs(res) return res
[docs]def parseFromString( rootStruct: "_StructureArg", inputString: str, context: Optional[StructParseContext] = None) -> Structure: """parses a DaCHS RD tree rooted in ``rootStruct`` from a string. It returns the root element of the resulting tree. You would use this like this:: parseFromString(rscdef.Column, "<column name='foo'/>") """ return parseFromStream(rootStruct, io.StringIO(inputString), context)
[docs]def structToETree(aStruct: Structure) -> utils.ElementTree.Element: """returns an ElementTree for the copyable content of aStruct. Note that due to manipulations at parse time and non-copyable content, this will, in general, not reproduce the original XML trees. """ nodeStack = [utils.ElementTree.Element(cast(str, aStruct.name_))] for evType, elName, value in aStruct.iterEvents(): # data model horror. let me hack around it for now if elName=='_synthesizedRoles': continue try: if evType=="start": nodeStack.append(utils.ElementTree.SubElement(nodeStack[-1], elName)) elif evType=="end": nodeStack.pop() elif evType=="value": if value is None or value is common.NotGiven: # do not serialise empty or missing attributes at all pass elif elName=="content_": # This will become an actual "attribute" in an element. # Let's see how much we will regret blindly stringifying # these. nodeStack[-1].text = str(value) elif hasattr(value, "payload"): # it's an _AttBox, which ought to go soon anyway; skip # these, it's only hit stupid legacy STC annotation continue else: if isinstance(value, pathlib.Path): value = str(value) if hasattr(value, "id"): # I blindly assume that's a reference value = value.id # silently swallow everything that's not a string by now. # should we do better? if isinstance(value, str): nodeStack[-1].set(elName, value) else: raise utils.Error("Invalid struct event: %s"%evType) except Exception as exc: exc.dachsNote = ( # type: ignore # dynamic attribute "Badness occurred in element %s, event %s," " value %s\n"%(elName, evType, value)) raise return nodeStack[-1]
[docs]def structToXML(aStruct: Structure) -> str: """returns RD XML for a structure as a native string. """ dest = io.BytesIO() utils.ElementTree.ElementTree(structToETree(aStruct)).write(dest, "utf-8") return dest.getvalue().decode("utf-8")