Source code for gavo.dm.sil

"""
SIL, the Simple Instance Language, is an attempt to allow
data model instances written in a simple, JSON-like language.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import re

from gavo import utils
from gavo.dm import common


# sentinels for further processing
[docs]class Atom(str): """a sentinel class for atomic values of roles """ noQuotesOkRE = re.compile("[\w_.]+$")
[docs] def asSIL(self): if self.noQuotesOkRE.match(self): return str(self) else: return '"%s"'%(self.replace('"', '""'))
def __repr__(self): return "a"+str.__repr__(self).lstrip("u")
[docs]class Reference(str): """a sentinel class for roles referencing something else. """
[docs] def asSIL(self): return "@%s"%self
# parse methods, used by getGrammar, by nonterminal name there def _pa_attributeDef(s, p, toks): return ("attr", toks[0], toks[2]) def _pa_valueWithFallback(s, p, toks): return ("fallback", toks[0], toks[2]) def _pa_typeAnnotation(s, p, toks): return toks[1] def _pa_collection(s, p, toks): if len(toks)==1: # no explicit type annotation; we return None as type. return ("coll", None, toks[0]) else: return ("coll", toks[0], toks[1]) def _pa_obj(s, p, toks): if len(toks)==2: # with type annotation return ("obj", toks[0], toks[1][2]) else: # no type annotation; we should later add an annotation based on # the default for the DM return ("obj", None, toks[0][2]) def _pa_objectBody(s, p, toks): return ("uobj", None, toks[1].asList()) def _pa_sequenceBody(s, p, toks): return [toks[1].asList()] def _pa_reference(s, p, toks): return Reference(toks[1]) def _pa_simpleImmediate(s, p, toks): return Atom(toks[0]) def _pa_nullLiteral(s, p, toks): return [None]
[docs]@functools.cache def getGrammar(debug=False): """returns a grammar for parsing a SIL object description. """ from gavo.utils.parsetricks import (Word, Literal, alphas, alphanums, QuotedString, Forward, ZeroOrMore, Group, Optional, cStyleComment, pyparsingWhitechars, ParserElement) with pyparsingWhitechars("\t\n\r "): qualifiedIdentifier = Word(alphas+"_:", alphanums+"-._:") plainIdentifier = Word(alphas+"_", alphanums+"-._") externalIdentifier = Word(alphas+"_", alphanums+"._/#-") plainLiteral = Word(alphanums+"_-.") quotedLiteral = QuotedString(quoteChar='"', escQuote='""') nullLiteral = Literal("__NULL__") reference = (Literal('@') + externalIdentifier) complexImmediate = Forward() simpleImmediate = plainLiteral | quotedLiteral value = (nullLiteral | reference | complexImmediate | simpleImmediate) valueWithFallback = (value + "|" - value) attributeDef = (plainIdentifier + Literal(":") + (valueWithFallback | value)) typeAnnotation = (Literal('(') + qualifiedIdentifier + Literal(')')) objectBody = (Literal('{') + Group(ZeroOrMore( attributeDef )) + Literal('}')) obj = Optional(typeAnnotation) + objectBody sequenceBody = (Literal('[') + Group(ZeroOrMore( valueWithFallback | value | objectBody)) + Literal(']')) collection = Optional(typeAnnotation) + sequenceBody complexImmediate << ( obj | collection ) for sym in [complexImmediate, collection, sequenceBody, objectBody, typeAnnotation, attributeDef]: sym.ignore(cStyleComment) for n, func in globals().items(): if n.startswith("_pa_"): locals()[n[4:]].setParseAction(func) if debug: for name, sym in locals().items(): if isinstance(sym, ParserElement): sym.setDebug(True) sym.setName(name) return obj
def _getAttributeEvents(attrNode): """returns a list of events for an attr node in the parse tree. """ assert attrNode[0]=='attr' if isinstance(attrNode[2], (Reference, Atom)): return [('attr', attrNode[1], attrNode[2])] elif isinstance(attrNode[2], tuple): return list( _parseTreeToEvents(attrNode[2], roleName=attrNode[1])) elif attrNode[2] is None: # swallow __NULL__ attributes return [] else: assert False, "Bad object as parsed value: %s"%repr(attrNode[2]) def _getFallbackAttrEv(node): """returns a parser event for a fallback node. """ # we're copying the current (attribute) child and parse # the two arms of the fallback node; then we produce # a fallback node with the two resulting event streams. return ("fallback", _getAttributeEvents((node[0], node[1], node[2][1])), _getAttributeEvents((node[0], node[1], node[2][2]))) def _iterAttrs(node, seqType=None, roleName=None): """generates parse events for nodes with attribute children. (see _parseTreeToEvents). roleName and seqType are ignored here (the attribute names are part of the events). They are just part of the signature because _iterObjs needs them. """ for child in node[2]: if isinstance(child[2], tuple) and child[2][0]=="fallback": yield _getFallbackAttrEv(child) else: yield from iter(_getAttributeEvents(child)) def _getObjectEvents(itemNode, seqType, roleName): """returns a list parser events for an item node in the parse tree. This isn't a iterator because the lists are needed as is for fallbacks. """ if isinstance(itemNode, (Reference, Atom)): return [('item', itemNode, None)] elif itemNode[0]=="fallback": return [("fallback", _getObjectEvents(itemNode[1], seqType, roleName), _getObjectEvents(itemNode[2], seqType, roleName))] else: # complex node -- recurse into _parseTreeToEvents return list(_parseTreeToEvents( itemNode, seqType=seqType, roleName=roleName)) def _iterObjs(node, seqType, roleName): """generates parse events for sequences. (see _parseTreeToEvents). """ for child in node[2]: yield from iter(_getObjectEvents(child, seqType, roleName)) _PARSER_EVENT_MAPPING = { # -> (iterparse ev name, type source, child parser) 'obj': ('obj', 'fromNode', _iterAttrs), 'uobj': ('obj', 'seqType', _iterAttrs), 'coll': ('coll', 'fromNode', _iterObjs), } def _parseTreeToEvents(node, seqType=None, roleName=None): """helps iterparse by translating parse tree nodes into the events iterparse yields. This works by invoking sub-parsers as per _PARSER_EVENT_MAPPING, which maps parse tree node types to tuples of the parser event type, an indicator of where to get the the type from (either from the node or, for collection, from the embedding sequence), an the child parser. """ opener, typeSource, childParser = _PARSER_EVENT_MAPPING[node[0]] if typeSource=='fromNode': nodeType = node[1] elif typeSource=='seqType': nodeType = seqType else: assert False yield (opener, roleName, nodeType) for child in childParser(node, nodeType, roleName): yield child yield ('pop', None, None)
[docs]def iterparse(silLiteral): """yields parse events for a SIL literal in a string. The parse events are triples of one of the forms: * ('attr', roleName, value) add an attribute to the current annotation; value can be a "value" (as in object, reference,...) or an Alternative instance from fallbacks. * ('obj', roleName, type) create a new object object of type * ('coll', type, None) create a new collection annotation (type can be None) * ('item', val, None) add an atomic value to the current collection * ('pop', None, None) finish current annotation and add it to its container These events are generated from the tree that the grammar produces by logic in _parseTreeToEvents. """ root = getGrammar().parseString(silLiteral, parseAll=True)[0] return _parseTreeToEvents(root)
[docs]class AnnotationBuilder: """A class to build the table annotation. It is constructed with an iterator over the annotation events (from sil.iterparse), and an annotationFactory, which is a callable taking an attribute name and an attribute value, producing an Annotation instance. There, the value will be either an Atom or a Reference. Internally, the AnnotationBuilder works with an object stack, onto which built annotations are pushed. They are taken off as annotations get built. After construction, use the feed method to feed the parser events into the builder. When self.result becomes non-None, the Builder considers its job done and will AttributeError on further feed attempts. """ def __init__(self, annotationFactory): self.annotationFactory = annotationFactory self.obStack, self.result = [], None self.root = None
[docs] @classmethod def fromSILLiteral(cls, silLiteral, annotationFactory): """builds annotation using annotationFactory from a SIL literal. This adds a few sanity checks; fetch the annotation result (which you may not actually care about a lot) from the result's result attribute. """ self = cls(annotationFactory) for ev in iterparse(silLiteral): self.feed(*ev) if self.result is None: raise utils.StructureError("Data model annotation yielded no result.") if self.result.type is None: raise utils.StructureError("Root of Data Model annotation must" " have a type.") return self
[docs] def feed(self, evType, arg1, arg2): if self.root is None: self._buildRoot(evType, arg1, arg2) else: getattr(self, "_handle_"+evType)(arg1, arg2)
def _handle_obj(self, arg1, arg2): # an object: push it on the stack self.obStack.append(common.ObjectAnnotation(arg1, arg2, self.root)) def _handle_coll(self, arg1, arg2): # a collection: push it on the stack self.obStack.append(common.CollectionAnnotation(arg1, arg2, self.root)) def _handle_pop(self, arg1, arg2): # done with a certain kind of annotation: pop it and add it to # its parent (which is the new stack top). newRole = self.obStack.pop() if self.obStack: self.obStack[-1].add(newRole) else: # we've just popped the total result. Make sure # any further operations fail. del self.obStack self.result = newRole def _handle_attr(self, arg1, arg2): # an attribute: add it to the top of the stack (the attribute # name is part of the annotation). self.obStack[-1].add( self.annotationFactory(self.root, arg1, arg2)) def _handle_fallback(self, arg1, arg2): # a fallback: try building arg1, and if that fails, try arg2. # This leaves all actual action to the (recursive) delegation. try: curStackTop = len(self.obStack) for ev in arg1: self.feed(*ev) except: # unwind the stack if necessary to remove any junk left by # the abortive attempt self.obStack = self.obStack[:curStackTop] for ev in arg2: self.feed(*ev) def _handle_item(self, arg1, arg2): collection = self.obStack[-1] assert isinstance(collection, common.CollectionAnnotation) collection.add( self.annotationFactory(self.root, collection.name, arg1)) def _buildRoot(self, evType, arg1, arg2): assert evType=='obj' self.root = common.ObjectAnnotation(arg1, arg2, None) self.obStack.append(self.root)
[docs]def getAnnotation(silLiteral, annotationFactory): """returns an annotation object parsed from silLiteral. This is a shallow wrapper around AnnotationBuilder.fromSILLiteral, which you should probably directly use in new code. """ res = AnnotationBuilder.fromSILLiteral(silLiteral, annotationFactory) return res.result
if __name__=="__main__": g = getGrammar() getGrammar.enableDebuggingOutput() res = g.parseString( """ (:testclass) { seq: [a "b c d" @e]}""", parseAll=True)[0] print(res)