Source code for gavo.utils.plainxml

"""
Some XML hacks.

StartEndHandler simplifies the creation of SAX parsers, intended for
client code or non-DC XML parsing.

iterparse is an elementtree-inspired thin expat layer; both VOTable
and base.structure parsing builds on it.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import collections
import weakref
import xml.sax
from xml.parsers import expat
from xml.sax.handler import ContentHandler

from gavo.utils import excs
from gavo.utils import misctricks
from gavo.utils import texttricks

[docs]class ErrorPosition(object): """A wrapper for an error position. Construct it with file name, line number, and column. Use None for missing or unknown values. """ fName = None def __init__(self, fName, line, column): self.line = line or '?' self.col = column if self.col is None: self.col = '?' self.fName = fName def __str__(self): if self.fName: return "%s, (%s, %s)"%(self.fName, self.line, self.col) else: return "(%s, %s)"%(self.line, self.col)
[docs]class iterparse(object): """iterates over start, data, and end events in source. To keep things simple downstream, we swallow all namespace prefixes, if present. iterparse is constructed with a source (anything that can read(source)) and optionally a custom error class. This error class needs to have the message as the first argument. Since expat error messages usually contain line number and column in them, no extra pos attribute is supported. Since the parser typically is far ahead of the events seen, we do our own bookkeeping by storing the parser position with each event. The *end* of the construct that caused an event can be retrieved using pos. """ chunkSize = 2**20 "The number of bytes handed to expat from iterparse at one go." def __init__(self, source, parseErrorClass=excs.StructureError): self.source = source self.parseErrorClass = parseErrorClass if hasattr(source, "name"): self.inputName = source.name elif hasattr(source, "getvalue"): self.inputName = texttricks.makeEllipsis("IO:'" +texttricks.safe_str(source.getvalue()))+"'" else: self.inputName = texttricks.makeSourceEllipsis(source) self.parser = expat.ParserCreate() self.parser.buffer_text = True self.lastLine, self.lastColumn = 1, 0 self.evBuf = collections.deque() self.parser.StartElementHandler = self._startElement self.parser.EndElementHandler = self._endElement self.parser.CharacterDataHandler = self._characters def __iter__(self): return self def _startElement(self, name, attrs): self.evBuf.append( (("start", name.split(":")[-1], attrs), (self.parser.CurrentLineNumber, self.parser.CurrentColumnNumber))) def _endElement(self, name): self.evBuf.append((("end", name.split(":")[-1], None), (self.parser.CurrentLineNumber, self.parser.CurrentColumnNumber))) def _characters(self, data): self.evBuf.append((("data", None, data), None))
[docs] def pushBack(self, type, name, payload): self.evBuf.appendleft(((type, name, payload), None))
def __next__(self): while not self.evBuf: try: nextChunk = self.source.read(self.chunkSize) if nextChunk: self.parser.Parse(nextChunk) else: self.close() break except expat.ExpatError as ex: srcDesc = getattr(self.source, "name", "(internal source)") newEx = self.parseErrorClass(srcDesc+" "+str(ex)) newEx.posInMsg = True # see base.xmlstruct newEx.inFile = srcDesc raise misctricks.logOldExc(newEx) if not self.evBuf: raise StopIteration("End of Input") event, pos = self.evBuf.popleft() if pos is not None: self.lastLine, self.lastColumn = pos return event
[docs] def close(self): self.parser.Parse("", True) self.parser.StartElementHandler =\ self.parser.EndElementHandler = \ self.parser.CharacterDataHandler = None
@property def pos(self): return ErrorPosition(self.inputName, self.lastLine, self.lastColumn)
[docs] def getParseError(self, msg): res = self.parseErrorClass("At %s: %s"%(self.pos, msg)) res.posInMsg = True # see base.xmlstruct return res
[docs]class StartEndHandler(ContentHandler): """This class provides startElement, endElement and characters methods that translate events into method calls. When an opening tag is seen, we look of a _start_<element name> method and, if present, call it with the name and the attributes. When a closing tag is seen, we try to call _end_<element name> with name, attributes and contents. If the _end_xxx method returns a string (or similar), this value will be added to the content of the enclosing element. Rather than overriding __init__, you probably want to override the _initialize() method to create the data structures you want to fill from XML. StartEndHandlers clean element names from namespace prefixes, and they ignore them in every other way. If you need namespaces, use a different interface. """ def __init__(self): ContentHandler.__init__(self) self.realHandler = weakref.proxy(self) self.elementStack = [] self.contentsStack = [[]] self._initialize() def _initialize(self): pass
[docs] def processingInstruction(self, target, data): self.contentsStack[-1].append(data)
[docs] def cleanupName(self, name): return name.split(":")[-1].replace("-", "_")
[docs] def startElementNS(self, namePair, qName, attrs): newAttrs = {} for ns, name in list(attrs.keys()): if ns is None: newAttrs[name] = attrs[(ns, name)] else: newAttrs["{%s}%s"%(ns, name)] = attrs[(ns, name)] self.startElement(namePair[1], newAttrs)
[docs] def startElement(self, name, attrs): self.contentsStack.append([]) name = self.cleanupName(name) self.elementStack.append((name, attrs)) if hasattr(self.realHandler, "_start_%s"%name): getattr(self.realHandler, "_start_%s"%name)(name, attrs) elif hasattr(self, "_defaultStart"): self._defaultStart(name, attrs)
[docs] def endElementNS(self, namePair, qName): self.endElement(namePair[1])
[docs] def endElement(self, name, suppress=False): contents = "".join(self.contentsStack.pop()) name = self.cleanupName(name) _, attrs = self.elementStack.pop() res = None if hasattr(self.realHandler, "_end_%s"%name): res = getattr(self.realHandler, "_end_%s"%name)(name, attrs, contents) elif hasattr(self, "_defaultEnd"): res = self._defaultEnd(name, attrs, contents) if isinstance(res, str) and not suppress: self.contentsStack[-1].append(res)
[docs] def characters(self, chars): self.contentsStack[-1].append(chars)
[docs] def getResult(self): return self.contentsStack[0][0]
[docs] def getParentTag(self, depth=1): """Returns the name of the parent element. This only works as written here in end handlers. In start handlers, you have to path depth=2 (since their tag already is on the stack. """ if self.elementStack: return self.elementStack[-depth][0]
[docs] def parse(self, stream): xml.sax.parse(stream, self) return self
[docs] def parseString(self, string): xml.sax.parseString(string, self) return self
# xml.sax is smart enough to do the right thing when it gets passed bytes. parseBytes = parseString
[docs] def getAttrsAsDict(self, attrs): """returns attrs as received from SAX as a dictionary. The main selling point is that any namespace prefixes are removed from the attribute names. Any prefixes on attrs remain, though. """ return dict((k.split(":")[-1], v) for k, v in list(attrs.items()))
[docs] def setDocumentLocator(self, locator): self.locator = locator
[docs]def traverseETree(eTree): """iterates the elements of an elementTree in postorder. """ for child in eTree: for gc in traverseETree(child): yield gc yield eTree