Source code for gavo.base.meta

"""
Code dealing with meta information.

In DaCHS, tree-structured metadata can be attached to tables, services,
resources as a whole, and some other structures.  This module provides
a python mixin to keep and manipulate such metadata.

We deal with VO-style RMI metadata but also allow custom metadata.  Custom
metadata keys should usually start with _.

See develNotes for some discussion of why this is so messy and an explanation
of why in particular addMeta and helpers are a minefield of selections.

The rough plan to do this looks like this:

Metadata is kept in containers that mix in MetaMixin.  Meta information is
accessed through keys of the form <atom>{.<atom>}  The first atom is the
primary key.  An atom consists exclusively of ascii letters and the
underscore.

There's a meta mixin having the methods addMeta and getMeta that care
about finding metadata including handover.  For compound metadata, they
will split the key and hand over to the parent if the don't have a meta
item for the main key.

To make this complete for DaCHS, this needs to be completemented with magic,
"typed" meta classes overriding certain types of behaviour (see the
forKeys class decorator for how this works); base.__init__ arranges
for base.typedmeta to be imported.  To access these magic types,
use META_CLASSES_FOR_KEYS (but don't change that directory except
through @meta.forKeys).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import re
import textwrap

from gavo import utils
from gavo.base import attrdef
from gavo.base import common
from gavo.base import config
from gavo.utils import stanxml


# This dictionary maps meta atoms that were found to be somehow
# misleading or confusing to the ones that are now used.  This
# is so RDs aren't broken.
DEPRECATED_ATOMS = {
	"_associatedDatalinkSvc": "_associatedDatalinkService",
}


[docs]class MetaError(utils.Error): """A base class for metadata-related errors. MetaErrors have a carrier attribute that should point to the MetaMixin item queried. Metadata propagation makes this a bit tricky, but we should at least try; for setMeta and addMeta, the top-level entry functions manipulate the carrier attributes for this purpose. To yield useful error messages, leave carrier at its default None only when you really have no idea what the meta will end up on. """ def __init__(self, msg, carrier=None, hint=None, key=None): self.carrier = carrier self.key = key utils.Error.__init__(self, msg, hint=hint) def __str__(self): prefix = "" if self.carrier: prefix = "On %s: "%repr(self.carrier) return prefix+utils.Error.__str__(self)
[docs]class MetaSyntaxError(MetaError): """is raised when a meta key is invalid. """
[docs]class NoMetaKey(MetaError): """is raised when a meta key does not exist (and raiseOnFail is True). """
[docs]class MetaCardError(MetaError): """is raised when a meta value somehow has the wrong cardinality (e.g., on attempting to stringify a sequence meta). """ def __init__(self, msg, carrier=None, hint=None, key=None): self.origMsg = msg if key is None: MetaError.__init__(self, msg, hint=hint) else: MetaError.__init__(self, "%s (key %s)"%(msg, key), hint=hint)
[docs]class MetaValueError(MetaError): """is raised when a meta value is inapproriate for the key given. """
############################################# # Magic meta handling start META_CLASSES_FOR_KEYS = {}
[docs]def forKeys(*keys): def _(cls): for k in keys: assert k not in META_CLASSES_FOR_KEYS, "Redefining "+k META_CLASSES_FOR_KEYS[k] = cls return cls return _
[docs]def doMetaOverride(container, metaKey, metaValue, extraArgs={}): """creates the representation of metaKey/metaValue in container. This is there for magic meta types, mostly from typedmeta. It inspects META_CLASSES_FOR_KEYS, which in turn is fed by the forKeys class decorator. There's some hard-coded junk in here that doesn't quite fit anywhere else yet (but that should be rationalised if more hackery of this kind becomes necessary). If metaKey does not need any special action, this returns None. This gets called from one central point in MetaMixin.addMeta, and essentially all magic involved should be concentrated here. """ if metaKey in META_CLASSES_FOR_KEYS and not isinstance(metaValue, MetaValue): try: container.addMeta(metaKey, META_CLASSES_FOR_KEYS[metaKey](metaValue, **extraArgs)) return True except TypeError: raise utils.logOldExc(MetaError( "Invalid arguments for %s meta items: %s"%(metaKey, utils.safe_str(extraArgs)), None)) # let's see if there's some way to rationalise this kind of thing # later. if metaKey=="creator": return _doCreatorMetaOverride(container, metaValue) elif metaKey=="copyright" or metaKey=="_copyright": # Backwards compatibility 2019; it's cheap enough so I don't # bother deprecating the two keys for now. container.addMeta("rights", metaValue, **extraArgs)
# fallthrough: let addMeta do its standard thing. def _doCreatorMetaOverride(container, value): """handles the adding of the creator meta. value is empty or a parsed meta value, this does nothing (which will cause addMeta to do its default operation). If value is a non-empty string, it will be split along semicolons to produce individual creator metas with names . """ if not value or isinstance(value, MetaValue): return None for authName in (s.strip() for s in value.split(";")): container.addMeta("creator", MetaValue().addMeta("name", authName)) return True
[docs]class IncludesChildren(str): """a formatted result that already includes all meta children. This is returned from some of the special meta types' HTML formatters to keep the HTMLMetaBuilder from adding meta items that are already included in their HTML. """
# Magic meta handling end #############################################
[docs]def metaRstToHtml(inputString): return utils.rstxToHTML(inputString)
_metaPat = re.compile(r"([a-zA-Z_][\w-]*)(?:\.([a-zA-Z_][\w-]*))*$") _primaryPat = re.compile(r"([a-zA-Z_][\w-]*)(\.|$)")
[docs]def getPrimary(metaKey): key = _primaryPat.match(metaKey) if not key or not key.group(1): raise MetaSyntaxError("Invalid meta key: %s"%metaKey, None) return key.group(1)
[docs]def parseKey(metaKey): if not _metaPat.match(metaKey): raise MetaSyntaxError("Invalid meta key: %s"%metaKey, None) parsed = metaKey.split(".") for atom in parsed: if atom in DEPRECATED_ATOMS: utils.sendUIEvent("Warning", "Deprecated meta atom %s used in %s;" " please change the atom to %s."%( atom, metaKey, DEPRECATED_ATOMS[atom])) return [DEPRECATED_ATOMS.get(a, a) for a in parsed] return parsed
[docs]def parseMetaStream(metaContainer, metaStream, clearItems=False): """parser meta key/value pairs from metaStream and adds them to metaContainer. If clearItems is true, for each key found in the metaStream there's first a delMeta for that key executed. This is for re-parsing meta streams. The stream format is: - continuation lines with backslashes, where any sequence of backslash, (cr?) lf (blank or tab)* is replaced by nothing. - comments are lines like (ws*)# anything - empty lines are no-ops - all other lines are (ws*)<key>(ws*):(ws*)value(ws*) - if a key starts with !, any meta info for the key is cleared before setting - a value may be prefixed with rst:, literal:, or raw: to set the format of the meta literal (default is plain). """ if metaStream is None: return # handle continuation lines metaStream = re.sub("\\\\\r?\n[\t ]*", "", metaStream) keysSeen = set() for line in metaStream.split("\n"): line = line.strip() if line.startswith("#") or not line: continue try: key, value = line.split(":", 1) except ValueError: raise MetaSyntaxError("%s is no valid line for a meta stream"% repr(line), None, hint="In general, meta streams contain lines like 'meta.key:" " meta value; see also the documentation.") key = key.strip() if key.startswith("!"): key = key[1:] metaContainer.delMeta(key) if key not in keysSeen and clearItems: metaContainer.delMeta(key) keysSeen.add(key) value, format = value.strip(), "plain" mat = re.match("(rst|literal|raw):", value) if mat: value = value[mat.end():].strip() format = mat.group(1) metaContainer.addMeta(key, value, format=format)
[docs]class MetaParser(common.Parser): """A structure parser that kicks in when meta information is parsed from XML. This parser can also handle the notation with an attribute-less meta tag and lf-separated colon-pairs as content. """ # These are constructed a lot, so let's keep __init__ as clean as possible, # shall we? def __init__(self, container, nextParser): self.container, self.nextParser = container, nextParser self.attrs = {} self.children = [] # containing key, metaValue pairs self.next = None
[docs] def addMeta(self, key, content="", **kwargs): # this parse can be a temporary meta parent for children; we # record them here an play them back when we have created # the meta value itself self.children.append((key, content, kwargs))
[docs] def setMeta(self, key, content="", **kwargs): # see addMeta comment on why we need to do magic here. self.children.append(("!"+key, content, kwargs))
[docs] def start_(self, ctx, name, value): if name=="meta": return MetaParser(self, self) else: self.next = name return self
[docs] def value_(self, ctx, name, value): if self.next is None: self.attrs[str(name)] = value else: self.attrs[str(self.next)] = value return self
def _doAddMeta(self): content = self.attrs.pop("content_", "") if not self.attrs: # content only, parse this as a meta stream parseMetaStream(self.container, content) else: try: content = utils.fixIndentation(content, "", 1).rstrip() except common.Error as ex: raise utils.logOldExc(common.StructureError("Bad text in meta value" " (%s)"%ex)) if not "name" in self.attrs: raise common.StructureError("meta elements must have a" " name attribute") metaKey = self.attrs.pop("name") if metaKey.startswith("!"): self.container.setMeta(metaKey[1:], content, **self.attrs) else: self.container.addMeta(metaKey, content, **self.attrs) # meta elements can have children; add these, properly fudging # their keys for key, content, kwargs in self.children: if key.startswith("!"): fullKey = "%s.%s"%(metaKey, key[1:]) self.container.setMeta(fullKey, content, **kwargs) else: fullKey = "%s.%s"%(metaKey, key) self.container.addMeta(fullKey, content, **kwargs)
[docs] def end_(self, ctx, name, value): if name=="meta": try: self._doAddMeta() except TypeError as msg: raise utils.StructureError("While constructing meta: %s"%msg) return self.nextParser else: self.next = None return self
[docs]class MetaAttribute(attrdef.AttributeDef): """An attribute magically inserting meta values to Structures mixing in MetaMixin. We don't want to keep metadata itself in structures for performance reasons, so we define a parser of our own in here. """ typedesc = "Metadata" def __init__(self, description="Metadata"): attrdef.AttributeDef.__init__(self, "meta_", attrdef.Computed, description) self.xmlName_ = "meta" @property def default_(self): return {}
[docs] def feedObject(self, instance, value): self.meta_ = value
[docs] def getCopy(self, parent, newParent, ctx): """creates a deep copy of the current meta dictionary and returns it. This is used when a MetaMixin's attribute is set to copyable and a meta carrier is copied. As there's currently no way to make the _metaAttr copyable, this isn't called by itself. If you must, you can manually call this (_metaAttr.getCopy), but that'd really be an indication the interface needs changes. Note that the copying semantics is a bit funky: Copied values remain, but on write, sequences are replaced rather than added to. """ oldDict = parent.meta_ newMeta = {} for key, mi in oldDict.items(): newMeta[key] = mi.copy() return newMeta
[docs] def create(self, parent, ctx, name): return MetaParser(parent, parent)
[docs] def makeUserDoc(self): return ("**meta** -- a piece of meta information, giving at least a name" " and some content. See Metadata_ on what is permitted here.")
[docs] def iterEvents(self, instance): def doIter(metaDict): for key, item in metaDict.items(): for value in item: yield ("start", "meta", None) yield ("value", "name", key) if value.getContent(): yield ("value", "content_", value.getContent()) if value.meta_: for ev in doIter(value.meta_): yield ev yield ("end", "meta", None) for ev in doIter(instance.meta_): yield ev
[docs]class MetaMixin(common.StructCallbacks): """is a mixin for entities carrying meta information. The meta mixin provides the following methods: - setMetaParent(m) -- sets the name of the meta container enclosing the current one. m has to have the Meta mixin as well. - getMeta(key, propagate=True, raiseOnFail=False, default=None) -- returns meta information for key or default. - addMeta(key, metaItem, moreAttrs) -- adds a piece of meta information here. Key may be a compound, metaItem may be a text, in which case it will be turned into a proper MetaValue taking key and moreAttrs into account. - setMeta(key, metaItem) -- like addMeta, only previous value(s) are overwritten - delMeta(key) -- removes a meta value(s) for key. When querying meta information, by default all parents are queried as well (propagate=True). Metadata is not copied when the embedding object is copied. That, frankly, has not been a good design decision, and there should probably be a way to pass copypable=True to the mixin's attribute definition. """ _metaAttr = MetaAttribute() def __init__(self): """is a constructor for standalone use. You do *not* want to call this when mixing into a Structure. """ self.meta_ = {} def __hasMetaParent(self): try: self.__metaParent # assert existence return True except AttributeError: return False
[docs] def isEmpty(self): return len(self.meta_)==0 and getattr(self, "content", "")==""
[docs] def setMetaParent(self, parent): if parent is not None: self.__metaParent = parent
[docs] def setParent(self, parent): if hasattr(parent, "_getMeta"): self.setMetaParent(parent) super().setParent(parent)
[docs] def getMetaParent(self): if self.__hasMetaParent(): return self.__metaParent else: return None
def _getMeta(self, atoms, propagate, acceptSequence=False): try: return self._getFromAtom(atoms[0])._getMeta(atoms[1:], acceptSequence=acceptSequence) except NoMetaKey: pass # See if parent has the key if propagate: if self.__hasMetaParent(): return self.__metaParent._getMeta(atoms, propagate, acceptSequence=acceptSequence) else: return CONFIG_META._getMeta(atoms, propagate=False, acceptSequence=acceptSequence) raise NoMetaKey("No meta item %s"%".".join(atoms), carrier=self)
[docs] def getMeta(self, key, propagate=True, raiseOnFail=False, default=None, acceptSequence=False): try: try: return self._getMeta( parseKey(key), propagate, acceptSequence=acceptSequence) except NoMetaKey as ex: if raiseOnFail: ex.key = key raise except MetaCardError as ex: raise utils.logOldExc( MetaCardError(ex.origMsg, hint=ex.hint, key=key)) except MetaError as ex: ex.carrier = self raise return default
[docs] def hasMeta(self, atom, propagate=False): """returns true if this meta carrier has an atom metadatum. This will *not* parse atom, so you cannot (yet) pass in hierarchical meta names. With propagate, it will also ask the meta parents. In user code, it is generally preferable to just call getMeta with raiseOnFail. This is mainly a shortcut for internal code. """ if atom in self.meta_: return True if propagate: parent = self.getMetaParent() if parent: return parent.hasMeta(atom, True) return False
def _iterMeta(self, atoms): mine, others = atoms[0], atoms[1:] for mv in self.meta_.get(mine, []): if others: for child in mv._iterMeta(others): yield child else: yield mv
[docs] def iterMeta(self, key, propagate=False): """yields all MetaValues for key. This will traverse down all branches necessary to yield, in sequence, all MetaValues reachable by key. If propagation is enabled, the first meta carrier that has at least one item exhausts the iteration. (this currently doesn't return an iterator but a sequence; that's an implementation detail, though. You should only assume whatever comes back is iterable) """ val = list(self._iterMeta(parseKey(key))) if not val and propagate: if self.__hasMetaParent(): val = self.__metaParent.iterMeta(key, propagate=True) else: val = CONFIG_META.iterMeta(key, propagate=False) return val
[docs] def getAllMetaPairs(self): """iterates over all meta items this container has. Each item consists of key, MetaValue. Multiple MetaValues per key may be given. This will not iterate up, i.e., in general, getMeta will succeed for more keys than what's given here. """ class Accum(object): def __init__(self): self.items = [] self.keys = [] def startKey(self, key): self.keys.append(key) def enterValue(self, value): self.items.append((".".join(self.keys), value)) def endKey(self, key): self.keys.pop() accum = Accum() self.traverse(accum) return accum.items
[docs] def buildRepr(self, key, builder, propagate=True, raiseOnFail=True): value = self.getMeta(key, raiseOnFail=raiseOnFail, propagate=propagate) if value: builder.startKey(key) value.traverse(builder) builder.endKey(key) return builder.getResult()
def _hasAtom(self, atom): return atom in self.meta_ def _getFromAtom(self, atom): if atom in self.meta_: return self.meta_[atom] raise NoMetaKey("No meta child %s"%atom, carrier=self)
[docs] def getMetaKeys(self): return list(self.meta_.keys())
# XXX TRANS remove; this is too common a name keys = getMetaKeys def _setForAtom(self, atom, metaItem): self.meta_[atom] = metaItem def _addMeta(self, atoms, metaValue): primary = atoms[0] if primary in self.meta_: self.meta_[primary]._addMeta(atoms[1:], metaValue) else: self.meta_[primary] = MetaItem.fromAtoms(atoms[1:], metaValue)
[docs] def addMeta(self, key, metaValue, **moreAttrs): """adds metaItem to self under key. moreAttrs can be additional keyword arguments; these are used by the XML constructor to define formats or to pass extra items to special meta types. For convenience, this returns the meta container. """ try: if doMetaOverride(self, key, metaValue, moreAttrs): return self._addMeta(parseKey(key), ensureMetaValue(metaValue, moreAttrs)) except MetaError as ex: ex.carrier = self raise return self
def _delMeta(self, atoms): if atoms[0] not in self.meta_: return if len(atoms)==1: del self.meta_[atoms[0]] else: child = self.meta_[atoms[0]] child._delMeta(atoms[1:]) if child.isEmpty(): del self.meta_[atoms[0]]
[docs] def delMeta(self, key): """removes a meta item from this meta container. This will not propagate, i.e., getMeta(key) might still return something unless you give propagate=False. It is not an error do delete an non-existing meta key. """ self._delMeta(parseKey(key))
[docs] def setMeta(self, key, value, **moreAttrs): """replaces any previous meta content of key (on this container) with value. """ self.delMeta(key) self.addMeta(key, value, **moreAttrs)
[docs] def traverse(self, builder): for key, item in self.meta_.items(): builder.startKey(key) item.traverse(builder) builder.endKey(key)
[docs] def copyMetaFrom(self, other): """sets a copy of other's meta items on self. """ for key in other.getMetaKeys(): orig = other.getMeta(key) if orig is not None: # (orig None can happen for computed metadata) self.meta_[key] = orig.copy()
[docs] def makeOriginal(self, key): """marks the meta item key, if existing, as original. This is for when a meta container has copied metadata. DaCHS' default behaviour is that a subsequent addMeta will clear the copied content. Call this method for the key in question to enable adding to copied metadata. """ item = self.getMeta(key) if item is not None and hasattr(item, "copied"): del item.copied
[docs]class ComputedMetaMixin(MetaMixin): """A MetaMixin for classes that want to implement defaults for unresolvable meta items. If getMeta would return a NoMetaKey, this mixin's getMeta will check the presence of a _meta_<key> method (replacing dots with two underscores) and, if it exists, returns whatever it returns. Otherwise, the exception will be propagated. The _meta_<key> methods should return MetaItems; if something else is returned, it is wrapped in a MetaValue. On copying such metadata, the copy will retain the value on the original if it has one. This does not work for computed metadata that would be inherited. """ def _getFromAtom(self, atom): try: return MetaMixin._getFromAtom(self, atom) except NoMetaKey: methName = "_meta_"+atom if hasattr(self, methName): res = getattr(self, methName)() if res is None: raise return ensureMetaItem(res) raise
[docs] def getMetaKeys(self): computedKeys = [] for name in dir(self): if name.startswith("_meta_"): computedKeys.append(name[6:]) return MetaMixin.getMetaKeys(self)+computedKeys
def _iterMeta(self, atoms): if len(atoms)>1: for mv in MetaMixin._iterMeta(self, atoms): yield mv else: res = list(MetaMixin._iterMeta(self, atoms)) if not res: try: res = self._getFromAtom(atoms[0]) except NoMetaKey: res = [] for val in res: yield val
[docs]class MetaItem(object): """is a collection of homogeneous MetaValues. All MetaValues within a MetaItem have the same key. A MetaItem contains a list of children MetaValues; it is usually constructed with just one MetaValue, though. Use the alternative constructor formSequence if you already have a sequence of MetaValues. Or, better, use the ensureMetaItem utility function. The last added MetaValue is the "active" one that will be changed on _addMeta calls. """ def __init__(self, val): self.children = [] self.addChild(val)
[docs] @classmethod def fromSequence(cls, seq): res = cls(seq[0]) for item in seq[1:]: res.addChild(item) return res
[docs] @classmethod def fromAtoms(cls, atoms, metaValue): if len(atoms)==0: # This will become my child. return cls(metaValue) elif len(atoms)==1: # Create a MetaValue with the target as child mv = MetaValue() mv._setForAtom(atoms[0], cls(ensureMetaValue(metaValue))) return cls(mv) else: # Create a MetaValue with an ancestor of the target as child mv = MetaValue() mv._setForAtom(atoms[0], cls.fromAtoms(atoms[1:], ensureMetaValue(metaValue))) return cls(mv)
def __str__(self): try: res = self.getContent(targetFormat="text") return res except MetaError: return ", ".join(m.getContent(targetFormat="text") for m in self.children) def __iter__(self): return iter(self.children) def __len__(self): return len(self.children) def __getitem__(self, index): return self.children[index]
[docs] def isEmpty(self): return len(self)==0
[docs] def addContent(self, item): self.children[-1].addContent(item)
[docs] def addChild(self, metaValue=None, key=None): # XXX should we force metaValue to be "compatible" with what's # already in children? if hasattr(self, "copied"): self.children = [] delattr(self, "copied") if metaValue is None: metaValue = MetaValue(None) if isinstance(metaValue, str): metaValue = MetaValue(metaValue) assert isinstance(metaValue, MetaValue) self.children.append(metaValue)
def _getMeta(self, atoms, acceptSequence=False): if atoms: if len(self.children)!=1 and not acceptSequence: raise MetaCardError("Meta sequence in branch for getMeta") else: return self.children[0]._getMeta( atoms, acceptSequence=acceptSequence) return self def _addMeta(self, atoms, metaValue): # See above for this mess -- I'm afraid it has to be that complex # if we want to be able to build compound sequences using text labels. # Case 1: Direct child of MetaMixin, sequence addition if not atoms: self.addChild(metaValue) else: self.children[-1]._addMeta(atoms, metaValue)
[docs] def getMeta(self, key, *args, **kwargs): if len(self.children)==1: return self.children[0].getMeta(key, *args, **kwargs) else: raise MetaCardError("No getMeta for meta value sequences", carrier=self, key=key)
[docs] def getContent(self, targetFormat="text", macroPackage=None, acceptSequence=False): if len(self.children)==1 or acceptSequence: return self.children[0].getContent(targetFormat, macroPackage) raise MetaCardError("getContent not allowed for sequence meta items", carrier=self)
def _delMeta(self, atoms): newChildren = [] for c in self.children: c._delMeta(atoms) if not c.isEmpty(): newChildren.append(c) self.children = newChildren
[docs] def traverse(self, builder): for mv in self.children: if mv.content: builder.enterValue(mv) mv.traverse(builder)
[docs] def copy(self): """returns a deep copy of self. """ newOb = self.__class__("") newOb.children = [mv.copy() for mv in self.children] newOb.copied = True return newOb
[docs] def serializeToXMLStan(self): return str(self)
class _NoHyphenWrapper(textwrap.TextWrapper): # we don't want whitespace after hyphens in plain meta strings (looks # funny in HTML), so fix wordsep_re wordsep_re = re.compile( r'(\s+|' # any whitespace r'(?<=[\w\!\"\'\&\.\,\?])-{2,}(?=\w))') # em-dash
[docs]class MetaValue(MetaMixin): """is a piece of meta information about a resource. The content is always a string. The text content may be in different formats, notably - literal - rst (restructured text) - plain (the default) - raw (for embedded HTML, mainly -- only use this if you know the item will only be embedded into HTML templates). """ knownFormats = set(["literal", "rst", "plain", "raw"]) paragraphPat = re.compile("\n\\s*\n") consecutiveWSPat = re.compile("\s\s+") plainWrapper = _NoHyphenWrapper(break_long_words=False, replace_whitespace=True) def __init__(self, content="", format="plain"): self.initArgs = content, format MetaMixin.__init__(self) if format not in self.knownFormats: raise common.StructureError( "Unknown meta format '%s'; allowed are %s."%( format, ", ".join(self.knownFormats))) self.content = content self.format = format self._preprocessContent() def __str__(self): return self.getContent() def _preprocessContent(self): if self.format=="plain" and self.content is not None: self.content = "\n\n".join(self.plainWrapper.fill( self.consecutiveWSPat.sub(" ", para)) for para in self.paragraphPat.split(self.content)) def _getContentAsText(self, content): return content def _getContentAsHTML(self, content, block=False): if block: encTag = "p" else: encTag = "span" if self.format=="literal": return '<%s class="literalmeta">%s</%s>'%(encTag, stanxml.escapePCDATA(content), encTag) elif self.format=="plain": return "\n".join('<%s class="plainmeta">%s</%s>'%( encTag, stanxml.escapePCDATA(p), encTag) for p in content.split("\n\n")) elif self.format=="rst": # XXX TODO: figure out a way to have them block=False return metaRstToHtml(content) elif self.format=="raw": return content
[docs] def getExpandedContent(self, macroPackage): if hasattr(macroPackage, "expand") and "\\" in self.content: return macroPackage.expand(self.content) return self.content
[docs] def getContent(self, targetFormat="text", macroPackage=None): content = self.getExpandedContent(macroPackage) if targetFormat=="text": return self._getContentAsText(content) elif targetFormat=="html": return self._getContentAsHTML(content) elif targetFormat=="blockhtml": return self._getContentAsHTML(content, block=True) else: raise MetaError("Invalid meta target format: %s"%targetFormat)
def _getMeta(self, atoms, propagate=False, acceptSequence=False): return self._getFromAtom(atoms[0])._getMeta(atoms[1:], acceptSequence=acceptSequence) def _addMeta(self, atoms, metaValue): # Cases continued from MetaItem._addMeta # Case 2: Part of a compound, metaValue is to become direct child if len(atoms)==1: primary = atoms[0] # Case 2.1: Requested child exists if self._hasAtom(primary): self._getFromAtom(primary).addChild(metaValue, primary) # Case 2.2: Requested child does not exist else: self._setForAtom(primary, MetaItem(metaValue)) # Case 3: metaItem will become an indirect child else: primary = atoms[0] # Case 3.1: metaValue will become a child of an existing child of ours if self._hasAtom(primary): self._getFromAtom(primary)._addMeta(atoms[1:], metaValue) # Case 3.2: metaItem is on a branch that needs yet to be created. else: self._setForAtom(primary, MetaItem.fromAtoms(atoms[1:], metaValue))
[docs] def copy(self): """returns a deep copy of self. """ newOb = self.__class__(*self.initArgs) newOb.format, newOb.content = self.format, self.content newOb.copyMetaFrom(self) return newOb
[docs]def printMetaTree(metaContainer, curKey=""): #for debugging md = metaContainer.meta_ for childName in md: childKey = curKey+"."+childName for child in md[childName]: print(childKey, child.getContent("text")) printMetaTree(child, childKey)
[docs]def ensureMetaValue(val, moreAttrs={}): """makes a MetaValue out of val and a dict moreAttrs unless val already is a MetaValue. """ if isinstance(val, MetaValue): return val return MetaValue(val, **moreAttrs)
[docs]def ensureMetaItem(thing, moreAttrs={}): """ensures that thing is a MetaItem. If it is not, thing is turned into a sequence of MetaValues, which is then packed into a MetaItem. Essentially, if thing is not a MetaValue, it is made into one with moreAttrs. If thing is a list, this recipe is used for all its items. """ if isinstance(thing, MetaItem): return thing if isinstance(thing, list): return MetaItem.fromSequence( [ensureMetaValue(item, moreAttrs) for item in thing]) return MetaItem(ensureMetaValue(thing, moreAttrs))
[docs]def getMetaText(ob, key, default=None, **kwargs): """returns the meta item key form ob in text form if present, default otherwise. You can pass getMeta keyword arguments (except default). Additionally, there's acceptSequence; if set to true, this will return the first item of a sequence-valued meta item rather than raising an error. ob will be used as a macro package if it has an expand method; to use something else as the macro package, pass a macroPackage keyword argument. """ macroPackage = ob if "macroPackage" in kwargs: macroPackage = kwargs.pop("macroPackage") acceptSequence = kwargs.get("acceptSequence", False) m = ob.getMeta(key, default=None, **kwargs) if m is None: return default try: return m.getContent( macroPackage=macroPackage, acceptSequence=acceptSequence) except MetaCardError as ex: raise MetaCardError(ex.origMsg, carrier=ex.carrier, hint=ex.hint, key=key)
[docs]class MetaBuilder(object): """A base class for meta builders. Builders are passed to a MetaItem's traverse method or to MetaMixin's buildRepr method to build representations of the meta information. You can override startKey, endKey, and enterValue. If you are not doing anything fancy, you can get by by just overriding enterValue and inspecting curAtoms[-1] (which contains the last meta key). You will want to override getResult. """ def __init__(self): self.curAtoms = []
[docs] def startKey(self, key): self.curAtoms.append(key)
[docs] def endKey(self, key): self.curAtoms.pop()
[docs] def enterValue(self, value): pass
[docs] def getResult(self): pass
[docs]class TextBuilder(MetaBuilder): """is a MetaBuilder that recovers a tuple sequence of the meta items in text representation. """ def __init__(self): self.metaItems = [] super(TextBuilder, self).__init__()
[docs] def enterValue(self, value): self.metaItems.append((".".join(self.curAtoms), value.getContent()))
[docs] def getResult(self): return self.metaItems
[docs]def stanFactory(tag, **kwargs): """returns a factory for ModelBasedBuilder built from a stan-like "tag". """ def factory(args, localattrs=None): if localattrs: localattrs.update(kwargs) attrs = localattrs else: attrs = kwargs if isinstance(tag, type): # this presumably is a stanxml.Element. Instantiate first el = tag() elif hasattr(tag, "clone"): # this ought to be something t.w.template.Tag-like el = tag.clone() else: # assume it's a factory function el = tag() return el(**attrs)[args] return factory
# Within this abomination of code, the following is particularly nasty. # It *must* go.
[docs]class ModelBasedBuilder(object): """is a meta builder that can create stan-like structures from meta information It is constructed with with a tuple-tree of keys and DOM constructors; these must work like stan elements, which is, e.g., also true for our registrymodel elements. Each node in the tree can be one of: - a meta key and a callable, - this, and a sequence of child nodes - this, and a dictionary mapping argument names for the callable to meta keys of the node; the arguments extracted in this way are passed in a single dictionary localattrs. The callable can also be None, which causes the corresponding items to be inlined into the parent (this is for flattening nested meta structures). The meta key can also be None, which causes the factory to be called exactly once (this is for nesting flat meta structures). """ def __init__(self, constructors, format="text"): self.constructors, self.format = constructors, format def _buildNode(self, processContent, metaItem, children=(), macroPackage=None): if not metaItem: return [] result = [] for child in metaItem.children: content = [] c = child.getContent(self.format, macroPackage=macroPackage) if c: content.append(c) childContent = self._build(children, child, macroPackage) if childContent: content.append(childContent) if content: result.append(processContent(content, child)) return result def _getItemsForConstructor(self, metaContainer, macroPackage, key, factory, children=(), attrs={}): if factory: def processContent(childContent, metaItem): moreAttrs = {} for argName, metaKey in attrs.items(): val = metaItem.getMeta(metaKey) if val: moreAttrs[argName] = val.getContent("text", macroPackage=macroPackage) return [factory(childContent, localattrs=moreAttrs)] else: def processContent(childContent, metaItem): #noflake: conditional def return childContent if key is None: return [factory(self._build(children, metaContainer, macroPackage))] else: return self._buildNode(processContent, metaContainer.getMeta(key, raiseOnFail=False), children, macroPackage=macroPackage) def _build(self, constructors, metaContainer, macroPackage): result = [] for item in constructors: if isinstance(item, str): result.append(item) else: try: result.extend(self._getItemsForConstructor(metaContainer, macroPackage, *item)) except utils.Error: raise except: raise utils.logOldExc(utils.ReportableError( "Invalid constructor func in %s, meta container active %s"%( repr(item), repr(metaContainer)))) return result
[docs] def build(self, metaContainer, macroPackage=None): if macroPackage is None: macroPackage = metaContainer return self._build(self.constructors, metaContainer, macroPackage)
# Global meta, items get added CONFIG_META = MetaMixin()
[docs]def makeFallbackMeta(reload=False): """fills CONFIG_META with items from $configDir/defaultmeta.txt. This is called from the module __init__ once typed meta from base is in (which means that any typed meta defined in other places won't become magic in fallback; regrettably, we can't hold off loading fallback meta much longer). """ srcPath = os.path.join(config.get("configDir"), "defaultmeta.txt") if not os.path.exists(srcPath): # this will be made a bit later in dachs init return with open(srcPath, "rb") as f: content = f.read().decode("utf-8", "ignore") parseMetaStream(CONFIG_META, content, clearItems=reload)