"""
Active tags are used in prepare and insert computed material into RD trees.
And, ok, we are dealing with elements here rather than tags, but I liked
the name "active tags" much better, and there's too much talk of elements
in this source as it is.
The main tricky part with active tags is when they're nested. In
short, active tags are expanded even when within active tags. So,
if you write::
<STREAM id="foo">
<LOOP>
</LOOP>
</STREAM>
foo contains not a loop element but whatever that spit out. In particular,
macros within the loop are expanded not within some FEED element but
within the RD.
There is a non-expanding version of STREAM, too, but that has to take
special precautions.
"""
#c Copyright 2008-2025, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import csv
import re
from io import StringIO
from gavo import base # (to pass into codeItems' namespaces only)
from gavo import utils
from gavo.base import attrdef
from gavo.base import common
from gavo.base import complexattrs
from gavo.base import macros
from gavo.base import parsecontext
from gavo.base import structure
from gavo.utils.dachstypes import (cast, TYPE_CHECKING,
Any, Callable, Dict, Iterator, List, Optional, Self,
Structure, StructParseContext, StructParserValue, Type, Union)
if TYPE_CHECKING:
from gavo.base import xmlstruct
# the following is a sentinel for values that have been expanded
# by an active tag already. When active tags are nested, only the
# innermost active tag must expand macros so one can be sure that
# double-escaped macros actually end up when the events are finally
# replayed at the top level. _EXPANDED_VALUE must compare true to
# value since it is used as such in event triples.
class _ExValueType(object):
def __str__(self) -> str:
return "value"
def __repr__(self) -> str:
return "'value/expanded'"
def __eq__(self, other: object) -> bool:
return other=="value"
def __ne__(self, other: object):
return not other=="value"
_EXPANDED_VALUE =_ExValueType()
[docs]class ActiveTag:
"""A mixin for active tags.
This is usually mixed into structure.Structures or derivatives. It
is also used as a sentinel to find all active tags below.
"""
name_: Union[str, Type[utils.Undefined]] = utils.Undefined
[docs]class GhostMixin(common.StructCallbacks):
"""A mixin to make a Structure ghostly.
Most active tags are "ghostly", i.e., the do not (directly)
show up in their parents. Therefore, as a part of the wrap-up
of the new element, we raise an Ignore exception, which tells
the Structure's ``end_`` method to not feed us to the parent.
"""
[docs] def onElementComplete(self) -> None:
super().onElementComplete()
raise common.Ignore(self)
class _PreparedEventSource:
"""An event source for xmlstruct.
It is constructed with a list of events as recorded by classes
inheriting from RecordingBase.
"""
def __init__(self, events: List) -> None:
self.events_ = events
self.curEvent = -1
self.pos = None
def __iter__(self) -> "_PreparedEventSource":
return _PreparedEventSource(self.events_)
def __next__(self) -> common.ParserEvent:
self.curEvent += 1
try:
nextItem = self.events_[self.curEvent]
except IndexError:
raise StopIteration()
res, self.pos = nextItem[:3], nextItem[-1]
return res
[docs]@structure.buildstructure
class Defaults(structure.Structure):
"""Defaults for macros.
In STREAMs and NXSTREAMs, DEFAULTS let you specify values filled into
macros when a FEED doesn't given them. Macro names are attribute names
(or element names, if you insist), defaults are their values.
"""
name_ = "DEFAULTS"
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.defaults: Dict[str, StructParserValue] = {}
structure.Structure.__init__(self, *args, **kwargs)
[docs] def start_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Self:
return self
[docs] def value_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Self:
if name=="content_":
self.storedContent = value
else:
self.defaults[name] = value
return self
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[common.Parser]:
if name=="DEFAULTS":
self.finishElement(ctx)
if self.parent is not None:
self.parent.feedObject("DEFAULTS", self)
return self.parent
else:
self.defaults[name] = self.storedContent
del self.storedContent
return self
[docs]@structure.buildstructure
class RecordingBase(structure.Structure):
"""An "abstract base" for active tags doing event recording.
The recorded events are available in the events attribute.
"""
name_ = "invalid-not-overridden"
_doc = attrdef.UnicodeAttribute("doc", description="A description of"
" this stream (should be restructured text).", strip=False)
_defaults = complexattrs.StructAttribute("DEFAULTS",
childFactory=Defaults, description="A mapping giving"
" defaults for macros expanded in this stream. Macros"
" not defaulted will fail when not given in a FEED's attributes.",
default=None)
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.events_: List[common.ParserEvent] = []
self.tagStack_: List[str] = []
structure.Structure.__init__(self, *args, **kwargs)
[docs] def feedEvent(self,
ctx: StructParseContext,
type: common.ParserEventType,
name: str,
value: StructParserValue) -> Optional[Union[common.Parser, str]]:
# keep _EXPANDED_VALUE rather than feed the value to protect it from
# further expansion by subordinate structures (except if we, the
# active tag, are the final recipient, in which case we gobble the thing
# ourselves).
if (type is _EXPANDED_VALUE and name not in self.managedAttrs):
self.events_.append(common.ParserEvent(
_EXPANDED_VALUE, name, value, ctx.pos))
return self
else:
return structure.Structure.feedEvent(self, ctx, type, name, value)
[docs] def start_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[Union[common.Parser, str]]:
if name in self.managedAttrs and not self.tagStack_:
res = structure.Structure.start_(self, ctx, name, value)
else:
self.events_.append(common.ParserEvent("start", name, value, ctx.pos))
res = self
self.tagStack_.append(name)
return res
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[common.Parser]:
if name in self.managedAttrs and not self.tagStack_:
structure.Structure.end_(self, ctx, name, value)
else:
self.events_.append(common.ParserEvent("end", name, value, ctx.pos))
self.tagStack_.pop()
return self
[docs] def value_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> structure.Structure:
if name in self.managedAttrs and not self.tagStack_:
# our attribute
structure.Structure.value_(self, ctx, name, value)
else:
self.events_.append(common.ParserEvent("value", name, value, ctx.pos))
return self
[docs] def getEventSource(self) -> _PreparedEventSource:
"""returns an object suitable as event source in xmlstruct.
"""
return _PreparedEventSource(self.events_)
[docs] def unexpandMacros(self) -> None:
"""undoes the marking of expanded values as expanded.
This is when, as with mixins, duplicate expansion of macros during
replay is desired.
"""
for ind, ev in enumerate(self.events_):
if ev[0]==_EXPANDED_VALUE:
self.events_[ind] = common.ParserEvent("value", *ev[1:])
[docs] def dump(self):
indent = ""
for type, name, val, pos in self.events_:
print(f"{indent}{type}\t{name}\t{val}")
if type=="start":
indent += " "
elif type=="end":
indent = indent[:-2]
# This lets us feedFrom these
iterEvents = getEventSource
[docs]@structure.buildstructure
class EventStream(RecordingBase, GhostMixin, ActiveTag):
"""An active tag that records events as they come in.
Their only direct effect is to leave a trace in the parser's id map.
The resulting event stream can be played back later.
"""
name_ = "STREAM"
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[common.Parser]:
# keep self out of the parse tree
if not self.tagStack_: # end of STREAM element
res = self.parent
self.parent = None
return res
return RecordingBase.end_(self, ctx, name, value)
[docs]@structure.buildstructure
class RawEventStream(EventStream):
"""An event stream that records events, not expanding active tags.
Normal event streams expand embedded active tags in place. This is
frequently what you want, but it means that you cannot, e.g., fill
in loop variables through stream macros.
With non-expanded streams, you can do that::
<NXSTREAM id="cols">
<LOOP listItems="\\stuff">
<events>
<column name="\\\\item"/>
</events>
</LOOP>
</NXSTREAM>
<table id="foo">
<FEED source="cols" stuff="x y"/>
</table>
Note that the normal innermost-only rule for macro expansions
within active tags does not apply for NXSTREAMS. Macros expanded
by a replayed NXSTREAM will be re-expanded by the next active
tag that sees them (this is allow embedded active tags to use
macros; you need to double-escape macros for them, of course).
"""
name_ = "NXSTREAM"
# Hack to signal xmlstruct.EventProcessor not to expand active tags here
ACTIVE_NOEXPAND = None
[docs]@structure.buildstructure
class EmbeddedStream(RecordingBase):
"""An event stream as a child of another element.
"""
name_ = "events" # Lower case since it's really a "normal" element that's
# added into the parse tree.
_passivate = attrdef.ActionAttribute("passivate",
methodName="_makePassive", description="If set to True, do not expand"
" active elements immediately in the body of these events"
" (as in an NXSTREAM)")
def _makePassive(self, ctx: StructParseContext) -> None:
if self.passivate.lower()=="true":
self.ACTIVE_NOEXPAND = True
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[common.Parser]:
if not self.tagStack_: # end of my element, do standard structure thing.
return structure.Structure.end_(self, ctx, name, value)
return RecordingBase.end_(self, ctx, name, value)
[docs]@structure.buildstructure
class Prune(ActiveTag, structure.Structure):
"""An active tag that lets you selectively delete children of the
current object.
You give it regular expression-valued attributes; on the replay of
the stream, matching items and their children will not be replayed.
If you give more than one attribute, the result will be a conjunction
of the specified conditions.
This only works if the items to be matched are true XML attributes
(i.e., not written as children).
For instance, the following will filter out all elements with a name
of VERB from the stream::
<PRUNE name="VERB"/>
"""
name_ = "PRUNE"
def __init__(self, parent: structure.Structure, **kwargs: Any) -> None:
self.conds: Dict[str, StructParserValue] = {}
structure.Structure.__init__(self, parent)
[docs] def value_(self,
ctx: StructParseContext,
name: str,
value: Union[Dict[str, str], str, None]) -> Self:
self.conds[name] = str(value)
return self
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: Union[Dict[str, str], str, None]) -> Optional[common.Parser]:
assert name==self.name_
self.matches = self._getMatcher()
if self.parent is not None:
self.parent.feedObject(self.name_, self)
return self.parent
def _getMatcher(self) -> Callable[[Dict[str, str]], bool]:
"""returns a callabe that takes a dictionary and matches the
entries against the conditions given.
"""
conditions = []
for attName, regEx in self.conds.items():
conditions.append((attName, re.compile(str(regEx))))
def match(aDict: Dict[str, str]) -> bool:
for attName, expr in conditions:
val = aDict.get(attName)
if val is None: # not given or null empty attrs never match
return False
if not expr.search(str(val)):
return False
return True
return match
[docs]@structure.buildstructure
class Edit(EmbeddedStream):
"""an event stream targeted at editing other structures.
When replaying a stream in the presence of EDITs, the elements are
are continually checked against ref. If an element matches, the
children of edit will be played back into it.
"""
name_ = "EDIT"
_ref = attrdef.UnicodeAttribute("ref", description="Destination of"
" the edits, in the form elementName[<name or id>]",
default=utils.Undefined)
refPat = re.compile(
r"([A-Za-z_][A-Za-z0-9_]*)\[([A-Za-z_][A-Za-z0-9_]*)\]")
[docs] def onElementComplete(self) -> None:
mat = self.refPat.match(self.ref)
if not mat:
raise common.LiteralParseError("ref", self.ref,
hint="edit references have the form <element name>[<value of"
" name or id attribute>]")
self.triggerEl, self.triggerId = mat.groups()
[docs]@structure.buildstructure
class ReplayBase(ActiveTag, structure.Structure, macros.StandardMacroMixin):
"""An "abstract base" for active tags replaying streams.
"""
name_: Union[str, Type[utils.Undefined]] = utils.Undefined
_expandMacros = True
_source = parsecontext.ReferenceAttribute("source",
description="id of a stream to replay", default=None)
_events = complexattrs.StructAttribute("events",
childFactory=EmbeddedStream, default=None,
description="Alternatively to source, an XML fragment to be replayed")
_edits = complexattrs.StructListAttribute("edits",
childFactory=Edit, description="Changes to be performed on the"
" events played back.")
_reexpand = attrdef.BooleanAttribute("reexpand", False,
description="Force re-expansion of macros; usually, when replaying,"
" each string is only expanded once, mainly to avoid overly long"
" backslash-fences. Set this to true to force further expansion.")
_prunes = complexattrs.StructListAttribute("prunes",
childFactory=Prune, description="Conditions for removing"
" items from the playback stream.")
def _ensureEditsDict(self) -> None:
if not hasattr(self, "editsDict"):
self.editsDict = {}
for edit in self.edits:
self.editsDict[edit.triggerEl, edit.triggerId] = edit
def _isPruneable(self, val: Dict[str, str]) -> bool:
for p in self.prunes:
if p.matches(val):
return True
return False
def _replayTo(self,
events: List[common.ParserEvent],
evTarget: "xmlstruct.EventProcessor",
ctx: StructParseContext) -> None:
"""pushes stored events into an event processor.
The public interface is replay (that receives a structure rather
than an event processor).
"""
idStack: List[Union[str,set]] = []
pruneStack: List[None] = []
# see RawEventStream's docstring for why we do not want to suppress
# further expansion with NXSTREAMs
typeOfExpandedValues: Union[_ExValueType, str] = _EXPANDED_VALUE
if isinstance(self.source, RawEventStream):
typeOfExpandedValues = "value"
with ctx.replaying():
for type, name, val, pos in events:
if (self._expandMacros
and type=="value"
and (self.reexpand or type is not _EXPANDED_VALUE)
and "\\" in val):
try:
val = self.expand(val)
except macros.MacroError as ex:
ex.hint = ("This probably means that you should have set a %s"
" attribute in the FEED tag. For details see the"
" documentation of the STREAM with id %s."%(
ex.macroName,
getattr(self.source, "id", "<embedded>")))
raise
type = typeOfExpandedValues
# the following mess implements the logic for EDIT.
if type=="start":
idStack.append(set())
elif type=="value":
if idStack and name=="id" or name=="name":
cast(set, idStack[-1]).add(val)
elif type=="end":
ids = idStack.pop()
for foundId in ids:
if (name, foundId) in self.editsDict:
self._replayTo(self.editsDict[name, foundId].events_,
evTarget,
ctx)
# The following mess implements the logic for PRUNE
if type=="start":
if pruneStack:
pruneStack.append(None)
else:
if self.prunes and self._isPruneable(val):
pruneStack.append(None)
try:
if not pruneStack:
evTarget.feed(type, name, val)
except Exception as msg:
msg.pos = "%s (replaying, real error position %s)"%( # type: ignore
ctx.pos, pos)
raise
if pruneStack and type=="end":
pruneStack.pop()
# ReferenceAttribute and similar may change the element fed into;
# make sure the right object is returned up-tree
# (the cast reflects that we always have a structure we're feeding
# into here).
self.parent = cast(Structure, evTarget.curParser)
[docs] def replay(self,
events: List[common.ParserEvent],
destination: structure.ParseableStructure,
ctx: StructParseContext) -> None:
"""pushes the stored events into the destination structure.
While doing this, local macros are expanded unless we already
receive the events from an active tag (e.g., nested streams
and such).
"""
# XXX TODO: Circular import here. Think again and resolve.
from gavo.base.xmlstruct import EventProcessor
evTarget = EventProcessor(destination, ctx)
evTarget.setParser(cast(Structure, destination))
self._ensureEditsDict()
try:
previousTarget = getattr(ctx, "replayTarget", None)
ctx.replayTarget = destination
self._replayTo(events, evTarget, ctx)
finally:
if previousTarget:
ctx.replayTarget = previousTarget
else:
delattr(ctx, "replayTarget")
[docs]@structure.buildstructure
class DelayedReplayBase(ReplayBase, GhostMixin):
"""An base class for active tags wanting to replay streams from
where the context is invisible.
These define a _replayer attribute that, when called, replays
the stored events *within the context at its end* and to the
parent.
This is what you want for the FEED and LOOP since they always work
on the embedding element and, by virtue of being ghosts, cannot
be copied. If the element embedding an event stream can be
copied, this will almost certainly not do what you want.
"""
def _setupReplay(self, ctx: StructParseContext) -> None:
sources = [s for s in [self.source, self.events] if s]
if len(sources)!=1:
raise common.StructureError("Need exactly one of source and events"
" on %s elements"%self.name_)
stream = sources[0].events_
def replayer() -> None:
if self.parent is not None:
self.replay(stream, self.parent, ctx)
self._replayer = replayer
[docs] def end_(self,
ctx: StructParseContext,
name: str,
value: StructParserValue) -> Optional[common.Parser]:
self._setupReplay(ctx)
return structure.Structure.end_(self, ctx, name, value)
[docs]@structure.buildstructure
class ReplayedEventsWithFreeAttributesBase(DelayedReplayBase):
"""An active tag that takes arbitrary attributes as macro definitions.
"""
def __init__(self, parent, *args: Any, **kwargs: Any) -> None:
DelayedReplayBase.__init__(self, parent, *args, **kwargs)
# managedAttrs in general is a class attribute. Here, we want
# to add values for the macros, and these are instance-local.
self.managedAttrs = self.managedAttrs.copy()
[docs] def completeElement(self,
ctx: StructParseContext) -> None:
# define any missing macros that still are in defaults.
if self.source and self.source.DEFAULTS is not None:
for key, value in self.source.DEFAULTS.defaults.items():
if not hasattr(self, "macro_"+key):
setattr(self, "macro_"+key, lambda v=value: v)
super().completeElement(ctx)
[docs] def getAttribute(self, name: str) -> Any:
try:
return DelayedReplayBase.getAttribute(self, name)
except common.StructureError: # no "real" attribute, it's a macro def
if len(name)==1:
raise common.StructureError("DaCHS does not support one-character"
f' macro names. Hence, you cannot use "{name}" as a {self.name_}'
" attribute.")
def m() -> str:
return getattr(self, name)
setattr(self, "macro_"+name.strip(), m)
self.managedAttrs[name] = attrdef.UnicodeAttribute(name)
return self.managedAttrs[name]
[docs]@structure.buildstructure
class ReplayedEvents(ReplayedEventsWithFreeAttributesBase):
"""An active tag that takes an event stream and replays the events,
possibly filling variables.
This element supports arbitrary attributes with unicode values. These
values are available as macros for replayed values.
"""
name_ = "FEED"
[docs] def completeElement(self,
ctx: StructParseContext) -> None:
super().completeElement(ctx)
self._replayer()
[docs]@structure.buildstructure
class NonExpandedReplayedEvents(ReplayedEvents):
"""A ReplayedEventStream that does not expand active tag macros.
You only want this when embedding a stream into another stream
that could want to expand the embedded macros.
"""
name_ = "LFEED"
_expandMacros = False
[docs]class GeneratorAttribute(attrdef.UnicodeAttribute):
"""An attribute containing a generator working on the parse context.
"""
[docs] def feed(self,
ctx: StructParseContext,
instance: structure.Structure,
literal: str) -> None:
if ctx.restricted:
raise common.RestrictedElement("codeItems")
attrdef.UnicodeAttribute.feed(self, ctx, instance, literal)
src = utils.fixIndentation(
getattr(instance, self.name_),
" ", governingLine=1)
src = "def makeRows(context):\n"+src+"\n"
instance.iterRowsFromCode = utils.compileFunction( # type: ignore
src, "makeRows", useGlobals={
"definitionContext": ctx,
"utils": utils,
"base": base},
uniqueName="<{} at {}>".format(
instance.name_, instance.getSourcePosition()))
[docs]@structure.buildstructure
class Loop(ReplayedEventsWithFreeAttributesBase):
"""An active tag that replays a feed several times, each time with
different values.
"""
name_ = "LOOP"
_csvItems = attrdef.UnicodeAttribute("csvItems", default=None,
description="The items to loop over, in CSV-with-labels format.",
strip=True)
_listItems = attrdef.UnicodeAttribute("listItems", default=None,
description="The items to loop over, as space-separated single"
" items. Each item will show up once, as 'item' macro.",
strip=True)
_codeItems = GeneratorAttribute("codeItems", default=None,
description="A python generator body that yields dictionaries"
" that are then used as loop items. You can access the parse context"
" as the context variable in these code snippets. context.replayTarget"
" is the struct the loop is feeding to. You also get the utils and base"
" namespaces, though using them it *a bit* dangerous (we don't guarantee"
" their stability).", strip=False)
[docs] def maybeExpand(self, val: str) -> str:
if "\\" in val:
el = self.parent
while el:
if hasattr(el, "expand"):
return el.expand(val)
el = el.parent
return val
def _makeRowIteratorFromListItems(self) -> Optional[Iterator]:
if self.listItems is None:
return None
def rowIterator() -> Iterator[Dict[str, str]]:
for item in self.maybeExpand(self.listItems).split():
yield {"item": item}
return rowIterator()
def _makeRowIteratorFromCSV(self) -> Optional[csv.DictReader]:
if self.csvItems is None:
return None
# I'd rather not do the encode below, but 2.7 csv can't handle
# unicode. We'll need to decode stuff again.
src = self.maybeExpand(self.csvItems).strip()
return csv.DictReader(StringIO(src), skipinitialspace=True)
def _makeRowIteratorFromCode(self,
ctx: StructParseContext) -> Optional[Iterator]:
if self.codeItems is None:
return None
return self.iterRowsFromCode(ctx) # type: ignore
def _getRowIterator(self, ctx: StructParseContext) -> Iterator:
rowIterators = [ri for ri in [
self._makeRowIteratorFromListItems(),
self._makeRowIteratorFromCSV(),
self._makeRowIteratorFromCode(ctx)] if ri]
if len(rowIterators)!=1:
raise common.StructureError("Must give exactly one data source in"
" LOOP")
return rowIterators[0] # type: ignore
[docs] def completeElement(self, ctx: StructParseContext) -> None:
super().completeElement(ctx)
for row in self._getRowIterator(ctx):
for name, value in row.items():
if value:
value = value.strip()
if name is None:
raise utils.StructureError(
"Too many CSV items (extra data: %s)"%value)
setattr(self, "macro_"+name.strip(), lambda v=value: v)
self._replayer()
[docs]class Debug(ActiveTag, GhostMixin):
"""Enter a debugger when parsing to here.
This is probably only interesting for DaCHS developers.
"""
name_ = "DEBUG"
managedAttrs: Dict[str, Any] = {}
attrSeq: List = []
def __init__(self, parent, *args, **kwargs):
self.parent = parent
[docs] def feedEvent(self,
ctx: StructParseContext,
type: common.ParserEventType,
name: str,
value: StructParserValue):
if type=="end":
print("DEBUG elemeent: the parent element is in self.parent")
print(" Also, use ctx.getById(...) to get elements by id")
import pdb;pdb.Pdb(nosigint=True).set_trace()
return self.parent
getActiveTag = utils.buildClassResolver(
ActiveTag,
list(globals().values()),
key=lambda obj: getattr(obj, "name_", None) or None)
[docs]def registerActiveTag(activeTag: structure.Structure) -> None:
"""declares activeTag as an active tag.
This is intended for active tags that might be defined one day
outside of this module; at least as of DaCHS 2.0, there's no such
thing.
"""
getActiveTag.registry[activeTag.name_] = activeTag # type: ignore
[docs]def isActive(name: str) -> bool:
return name in getActiveTag.registry #type: ignore