"""
Code to parse structures from XML sources.
The purpose of much of the mess here is to symmetrize XML attributes
and values. Basically, we want start, value, end events whether
or not a piece of data comes in an element with a certain tag name or
via a named attribute.
"""
#c Copyright 2008-2025, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import io
import pathlib
import re
from gavo import utils
from gavo.base import activetags
from gavo.base import common
from gavo.base import parsecontext
from gavo.utils.dachstypes import (cast, TYPE_CHECKING,
Dict, Iterator, Optional, Structure,
StructParseContext, StructParserValue, TextIO, Type, Union)
if TYPE_CHECKING:
from gavo.base.structure import ParseableStructure
_StructureArg = Union[ParseableStructure, Type[ParseableStructure]]
ALL_WHITESPACE = re.compile(r"\s*$")
[docs]class EventProcessor(common.Parser):
"""A dispatcher for parse events to structures.
It is constructed with the root structure of the result tree, either
as a type or as an instance.
After that, events can be fed to the feed method that makes sure
they are routed to the proper object.
"""
# The event processor distinguishes between parsing atoms (just one
# value) and structured data using the next attribute. If it is not
# None, the next value coming in will be turned to a "value" event
# on the current parser. If it is None, we hand through the event
# to the current structure.
debug = False
def __init__(self,
rootStruct: "_StructureArg",
ctx: StructParseContext) -> None:
self.curParser: common.Parser = self
self.next: Union[common.Parser, str, None] = None
self.ctx = ctx
self.result = self._instantiate(rootStruct)
@staticmethod
def _instantiate(rootStruct: "_StructureArg") -> Structure:
# in practice, all struct types in DaCHS are Structure-s and
# none are just ParseableStructure-s. That's the only
# reason the cast is safe; this asks for refactoring...
if isinstance(rootStruct, type):
return cast(Structure, rootStruct(None))
else:
return cast(Structure, rootStruct)
def _feedToAtom(self,
type: common.ParserEventType,
name: str,
value: StructParserValue) -> None:
if type=='start':
raise common.StructureError("%s elements cannot have %s children"%(
self.next, name))
elif type=='value' or type=="parsedvalue":
# For atoms, you can only set content_ -- they have no attributes.
if name!="content_":
raise common.StructureError("%s is atomic and thus has"
" no attribute %s."%(self.next, name))
# the cast in the next line is because I hope that logically,
# this case will not be reached when self.next is None.
self.curParser.feedEvent(
self.ctx, common.ParserEventType.value, cast(str, self.next), value)
elif type=='end':
self.next = None
def _feedToStructured(self,
type: common.ParserEventType,
name: str,
value: StructParserValue) -> None:
next = self.curParser.feedEvent(self.ctx, type, name, value)
if isinstance(next, str):
self.next = next
else:
self.curParser = cast(common.Parser, next)
[docs] def feed(self,
type: common.ParserEventType,
name: str,
value: StructParserValue = None) -> None:
"""feeds an event.
This is the main entry point for user calls.
"""
# Special handling for active tags: They may occur everywhere and
# thus are not not parsed by the element parsers but by us.
# Active tags may define ACTIVE_NOEXPAND to undo that behaviour
# (i.e., see active tag events themselves).
if (type=="start"
and activetags.isActive(name)
and not hasattr(self.curParser, "ACTIVE_NOEXPAND")):
self.curParser = activetags.getActiveTag(name)(self.curParser)
return
if self.next is None:
self._feedToStructured(type, name, value)
else:
self._feedToAtom(type, name, value)
[docs] def feedEvent(self,
ctx: StructParseContext,
evType: common.ParserEventType,
name: str,
value: StructParserValue) -> Optional[Union[common.Parser, str]]:
"""dispatches an event to the root structure.
Do not call this yourself unless you know what you're doing. The
method to feed "real" events to is feed.
"""
if name!=self.result.name_:
raise common.StructureError("Expected root element %s, found %s"%(
self.result.name_, name))
if evType=="start":
self.result.idmap = ctx.idmap # type: ignore # dynamic attribute
ctx.setPositionOn(self.result)
return self.result
else:
raise common.StructureError("Bad document structure")
[docs] def setParser(self, parser: Structure) -> None:
"""sets a structure to receive parse events.
In particular, this bypasses any checks that the event stream coming
is is actually destined for root. Use this for replay-type things
(feedFrom, active tags) exclusively.
"""
self.curParser = parser
self.result.idmap = self.ctx.idmap # type: ignore # dynamic attribute
def _synthesizeAttributeEvents(
evProc: EventProcessor,
context: StructParseContext,
attrs: Dict[str, str]) -> None:
"""generates value events for the attributes in attrs.
"""
# original attributes must be fed first since they will usually
# yield a different target object
original = attrs.pop("original", None)
if original:
evProc.feed(common.ParserEventType.value, "original", original)
# mixins must be fed last as they might depend on stuff set
# in other attributes
mixin = attrs.pop("mixin", None)
for key, val in attrs.items():
evProc.feed(common.ParserEventType.value, key, val)
if mixin:
evProc.feed(common.ParserEventType.value, "mixin", mixin)
[docs]def feedTo(
rootStruct: "_StructureArg",
eventSource: Iterator,
context: StructParseContext,
feedInto: bool = False) -> Structure:
"""feeds events from eventSource to rootStruct.
A new event processor is used for feeding. No context
exit functions are run.
The processed root structure is returned.
if feedInto is true, the event creating the root structure is not
expected (TODO: this is crap; fix it so that this is always the
case when rootStruct is an instance).
"""
evProc = EventProcessor(rootStruct, context)
if feedInto:
evProc.setParser(evProc.result)
buf = []
try:
for type, name, payload in eventSource:
# buffer data
if type=="data":
buf.append(payload)
continue
else:
if buf:
res = "".join(buf)
if not ALL_WHITESPACE.match(res):
evProc.feed(common.ParserEventType.value, "content_", res)
buf = []
# "normal" event feed
evProc.feed(type, name, payload)
# start event: Synthesize value events for attributes.
if type=="start" and payload:
_synthesizeAttributeEvents(evProc, context, payload)
payload = None
except Exception as ex:
if (not getattr(ex, "posInMsg", False)
and getattr(ex, "pos", None) is None):
# only add pos when the message string does not already have it.
ex.pos = eventSource.pos # type: ignore # dynamic attribute
raise
return evProc.result
[docs]def parseFromStream(
rootStruct: "_StructureArg",
inputStream: TextIO,
context: Optional[StructParseContext] = None) -> Structure:
"""parses a tree rooted in rootStruct from some file-like object inputStream.
It returns the root element of the resulting tree. If rootStruct is
a type subclass, it will be instantiated to create a root
element, if it is an instance, this instance will be the root.
"""
eventSource = utils.iterparse(inputStream)
if context is None:
context = parsecontext.ParseContext()
context.setEventSource(eventSource)
res = feedTo(rootStruct, eventSource, context)
context.runExitFuncs(res)
return res
[docs]def parseFromString(
rootStruct: "_StructureArg",
inputString: str,
context: Optional[StructParseContext] = None) -> Structure:
"""parses a DaCHS RD tree rooted in ``rootStruct`` from a string.
It returns the root element of the resulting tree. You would use this like
this::
parseFromString(rscdef.Column, "<column name='foo'/>")
"""
return parseFromStream(rootStruct, io.StringIO(inputString), context)
[docs]def structToETree(aStruct: Structure) -> utils.ElementTree.Element:
"""returns an ElementTree for the copyable content of aStruct.
Note that due to manipulations at parse time and non-copyable content,
this will, in general, not reproduce the original XML trees.
"""
nodeStack = [utils.ElementTree.Element(cast(str, aStruct.name_))]
for evType, elName, value in aStruct.iterEvents():
# data model horror. let me hack around it for now
if elName=='_synthesizedRoles':
continue
try:
if evType=="start":
nodeStack.append(utils.ElementTree.SubElement(nodeStack[-1], elName))
elif evType=="end":
nodeStack.pop()
elif evType=="value":
if value is None or value is common.NotGiven:
# do not serialise empty or missing attributes at all
pass
elif elName=="content_":
# This will become an actual "attribute" in an element.
# Let's see how much we will regret blindly stringifying
# these.
nodeStack[-1].text = str(value)
elif hasattr(value, "payload"):
# it's an _AttBox, which ought to go soon anyway; skip
# these, it's only hit stupid legacy STC annotation
continue
else:
if isinstance(value, pathlib.Path):
value = str(value)
if hasattr(value, "id"):
# I blindly assume that's a reference
value = value.id
# silently swallow everything that's not a string by now.
# should we do better?
if isinstance(value, str):
nodeStack[-1].set(elName, value)
else:
raise utils.Error("Invalid struct event: %s"%evType)
except Exception as exc:
exc.dachsNote = ( # type: ignore # dynamic attribute
"Badness occurred in element %s, event %s,"
" value %s\n"%(elName, evType, value))
raise
return nodeStack[-1]
[docs]def structToXML(aStruct: Structure) -> str:
"""returns RD XML for a structure as a native string.
"""
dest = io.BytesIO()
utils.ElementTree.ElementTree(structToETree(aStruct)).write(dest, "utf-8")
return dest.getvalue().decode("utf-8")