Source code for gavo.user.rdmanipulator

"""
Updating table and column metadata.

Originally, this was done by writing into RDs, and the bulk of the code
still reflects that.

The problem here is that RDs typically are formatted with lots of love,
also within elements -- e.g., like this::

	<column name="bla" type="text"
		ucd="foo.bar"
		description="A long text carefully
			broken at the right place"
	/>

There's no way one can coax a normal XML parser into giving events that'd
allow us to preserve this formatting.   Hence, when manipulating
RD sources, I need something less sophisticated -- the dump XML parser
implemented here.

Except possibly for coverage (and even there I have my doubts) all this
has turned out to be a bad idea, best shown by the endless trouble it is
whith STREAMs.  Hence, we're moving towards stuffing everything computed
by the system into the database.  Once that's done, this shouldn't be
called rdmanipulator any more.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os

from gavo import base
from gavo import rsc
from gavo import rscdesc
from gavo import utils
from gavo.user import info
from pyparsing import (CharsNotIn, Forward, Literal, Optional, 
	ParseResults, QuotedString, SkipTo,
	StringEnd, White, Word, ZeroOrMore, alphas, alphanums)


[docs]def flatten(arg): """returns a string from a (possibly edited) parse tree. """ if isinstance(arg, str): return arg elif isinstance(arg, (list, ParseResults)): return "".join(flatten(a) for a in arg) else: return arg.flatten()
def _nodify(s, p, t): # a parse action to keep pyparsing from flattenting out things into # a single list return [t.asList()]
[docs]class Attribute(list): """a sentinel for XML attributes. """ def __init__(self, t): list.__init__(self, t) self.name, self.value = t[0], t[2][1:-1]
[docs]def getAttribute(parseResult, name): """returns the Attribute element with name within parseResult. If no such attribute exists, a KeyError is raised. """ for el in parseResult: if isinstance(el, Attribute): if el.name==name: return el raise KeyError("No attribute %s in %s"%(name, flatten(parseResult)))
[docs]class NewElement(object): """an element to be inserted into a parsed xml tree. """ def __init__(self, elementName, textContent): self.elementName, self.textContent = elementName, textContent
[docs] def flatten(self): return "<%s>%s</%s>"%( self.elementName, utils.escapePCDATA(self.textContent), self.elementName)
[docs]class Element(list): """a sentinel for XML elements. These are constructed with lists of the type [tag,...]; the opening (or empty) tag is always item 0. """ def __init__(self, t): list.__init__(self, t) self.name = t[0][1]
[docs] def append(self, newChild): # our last element will in general be a closing element # TODO: special-case empty elements. self[-2:-2] = [newChild]
[docs] def getAttribute(self, name): """returns the Attribute element with name within self. If no such attribute exists, a KeyError is raised. """ return getAttribute(self[0], name)
[docs] def findElement(self, name): """returns the first element called name somewhere within the xml grammar-parsed parseResult This is a depth-first search, and it will return None if there is no such element. """ for el in self: if isinstance(el, Element): if el.name==name: return el res = el.findElement(name) if res is not None: return res
[docs] def countElements(self, name): """returns the number of name elements that are direct children of self. """ res = 0 for el in self: if isinstance(el, Element) and el.name==name: res = res+1 return res
[docs]def getXMLGrammar(manipulator): with utils.pyparsingWhitechars("\r"): name = Word(alphas+"_:", alphanums+".:_-") opener = Literal("<") closer = Literal(">") value = (QuotedString(quoteChar="'", multiline=True, unquoteResults=False) | QuotedString(quoteChar='"', multiline=True, unquoteResults=False)) attribute = (name + Optional(White()) + Literal("=") + Optional(White()) + value) tagOpener = (opener + name + ZeroOrMore(White() + attribute) + Optional(White())) openingTag = (tagOpener + closer) closingTag = (opener + Literal("/") + name + Optional(White()) + closer) emptyTag = (tagOpener + Optional(White()) + Literal("/>")) processingInstruction = (opener + Literal("?") + SkipTo("?>", include="True")) comment = (opener + Literal("!--") + SkipTo("-->", include="True")) cdataSection = (opener + Literal("![CDATA[") + SkipTo("]]>", include="True")) nonTagStuff = CharsNotIn("<", min=1) docItem = Forward() element = ( (openingTag + ZeroOrMore(docItem) + closingTag) | emptyTag) docItem << (element | processingInstruction | comment | cdataSection | nonTagStuff) document = (ZeroOrMore(Optional(White()) + docItem) + Optional(White()) + StringEnd()) document.parseWithTabs() element.addParseAction(manipulator._feedElement) tagOpener.addParseAction(manipulator._openElement) attribute.addParseAction(lambda s,p,t: [Attribute(t)]) openingTag.addParseAction(_nodify) closingTag.addParseAction(_nodify) emptyTag.addParseAction(_nodify) del manipulator for el in locals().values(): # this *really* shouldn't be necessary el.leaveWhitespace() del el return locals()
[docs]def processXML(document, manipulator): """processes an XML-document with manipulator. document is a string containing the XML, and the function returns serialized an XML. You're doing yourself a favour if document is a unicode string. manipulator is an instance of a derivation of Manipulator below. There's a secret handshake between Manipulator and the grammar, so you really need to inherit, just putting in the two methods won't do. """ syms = getXMLGrammar(manipulator) # from gavo.adql import grammar; grammar.enableDebug(syms) res = utils.pyparseString(syms["document"], document) return flatten(res)
[docs]class Manipulator(object): """a base class for processXML manipulators. Pass instances of these into processXML. You must up-call the constructor without arguments. Override the gotElement(parseResult) method to do what you want. The parseResult is a pyparsing object with the tag name in second position of the first matched thing and the attributes barely parsed out (if you need them, improve the parsing to get at the attributes with less effort.) gotElement receives an entire element with opening tag, content, and closing tag (or just an empty tag). To manipulate the thing, just return what you want in the document. There's also startElement(parsedOpener) that essentially works analogously; you will, however *not* receive startElements for empty elements, so that's really intended for bookkeeping. You also have a hasParent(tagName) method on Manipulators returning whether there's a tagName element somewhere among the ancestors of the current tag. """ def __init__(self): self.tagStack = [] def _openElement(self, s, p, parsedOpener): # called by the grammar when an XML element is opened. self.tagStack.append(parsedOpener[1]) return self.startElement(parsedOpener)
[docs] def hasParent(self, name): return name in self.tagStack
def _feedElement(self, s, p, parsedElement): # called by the grammar after an XML element has been closed self.tagStack.pop() parsedElement = Element(parsedElement) return [self.gotElement(parsedElement)]
[docs] def startElement(self, parsedOpener): return parsedOpener
[docs] def gotElement(self, parsedElement): return parsedElement
class _ValuesChanger(Manipulator): """a manipulator fiddling in values limits as returned by iterLimitsForTable. Note again: this implementation just supports a single coverage element per RD. We'll have to change limits contents when there can reasonable be more. """ def __init__(self, limits): self.tableTriggers, self.coverageItems = {}, {} self.curColumns = None for kind, payload in limits: if kind=="limits": tableName, columnName, min, max = payload self.tableTriggers.setdefault(tableName, {})[ columnName] = (min, max) elif kind=="coverage": reserved, axis, value = payload self.coverageItems[axis] = value else: assert False Manipulator.__init__(self) def startElement(self, parsedTag): if parsedTag[1]=="table": try: tableName = getAttribute(parsedTag, "id").value self.curColumns = self.tableTriggers.get(tableName) except KeyError: pass return parsedTag def _fixValues(self, parsedElement, limits): values = parsedElement.findElement("values") for attName, val in zip (["min", "max"], limits): if val is not None: try: values.getAttribute(attName)[2] = utils.escapeAttrVal(str(val)) except (AttributeError, KeyError): # user didn't put this limit into RD; let's assume for a reason pass def _fixCoverage(self, coverageElement): for axisName in self.coverageItems: destEl = coverageElement.findElement(axisName) if destEl is None: coverageElement.append( NewElement(axisName, self.coverageItems[axisName])) else: if coverageElement.countElements(axisName)!=1: raise base.ReportableError("Cannot replace coverage for" " axis '%s': unsupported previous content."%axisName, hint="DaCHS will only replace coverage if there is" " just one element for an axis. If you want DaCHS" " to update the coverage on this axis, delete any" " previous elements for this axis.") if len(destEl)==1: elName = destEl.pop()[1] # empty element destEl[:] = [ ['<', elName, '>'], self.coverageItems[axisName], ['</', elName, '>'],] elif len(destEl)==2: # opening and closing tag, insert text destEl[1:1] = self.coverageItems[axisName] elif len(destEl)==3: # element with content, replace previous content destEl[1] = self.coverageItems[axisName] else: assert False def gotElement(self, parsedElement): if self.curColumns is not None: if parsedElement.name=="column": for attrName in ["name", "original"]: try: colName = parsedElement.getAttribute(attrName).value if colName in self.curColumns: self._fixValues(parsedElement, self.curColumns[colName]) except KeyError: continue break if parsedElement.name=="table": self.curColumns = None # tables don't nest in DaCHS elif parsedElement.name=="coverage": self._fixCoverage(parsedElement) return parsedElement
[docs]def iterCoverageItems(updater): """yields coverage items for inclusion in RDs. NOTE: so far, we can only have one coverage item. So, it's enough to just say "fill this into axis x of coverage". If and when we have more than one coverage items, we'll have to re-think that. That's why there's the "reserved" value in the tuples. We'll have to put something in there (presumably the index of the coverage element, but perhaps we'll have a better identity at some point). """ if updater.parent.spatial is not None: sourceTable = updater.spaceTable or updater.sourceTable if sourceTable: yield "coverage", ("reserved", "spatial", info.getMOCForStdTable( sourceTable, updater.mocOrder).asASCII()) if updater.parent.temporal is not None: sourceTable = updater.timeTable or updater.sourceTable if sourceTable: for interval in info.iterScalarLimits( sourceTable, info.getTimeLimitsExprs): yield "coverage", ("reserved", "temporal", str(interval)) if updater.parent.spectral is not None: sourceTable = updater.spectralTable or updater.sourceTable if sourceTable: for interval in info.iterScalarLimits( sourceTable, info.getSpectralLimitsExprs): yield "coverage", ("reserved", "spectral", str(interval))
[docs]def iterLimitsForTable(tableDef, tablesOnly): """returns a list of values to fill in into tableDef. This will be empty if the table doesn't exist. Otherwise, it will be a tuple ("limit", table-id, column-name, min, max) for every column with a reasonably numeric type that has a min and max values. The other thing that *could* come back (but currently only does for iterLimitsForRD) is ("coverage", reserved, axis, literal); see iterCoverageItems for details. """ with base.AdhocQuerier(base.getWritableAdminConn) as q: if q.getTableType(tableDef.getQName()) is None: return t = rsc.TableForDef(tableDef, connection=q.connection) info.annotateDBTable(tableDef, extended=False, requireValues=True, acquireColumnMeta=not tablesOnly) for col in tableDef: if col.annotations: min, max = col.annotations["min"], col.annotations["max"] yield "limits", (tableDef.id, col.name, min, max) t.addToMeta()
[docs]def iterLimitsForRD(rd, tablesOnly): """returns a list of values to fill in for an entire RD. See iterLimitsForTable. """ for td in rd.tables: if td.onDisk: try: for limits in iterLimitsForTable(td, tablesOnly): yield limits except base.ReportableError as msg: base.ui.notifyError("Skipping %s: %s"%(td.id, utils.safe_str(msg))) if rd.coverage and rd.coverage.updater: for covItem in iterCoverageItems(rd.coverage.updater): yield covItem
[docs]def getChangedRD(rdId, limits): """returns a string corresponding to the RD with rdId with limits applied. Limits is a sequence of (table-id, column-name, min, max) tuples. We assume the values elements already exist. """ _, f = rscdesc.getRDInputStream(rdId) content = f.read().decode("utf-8") f.close() return processXML(content, _ValuesChanger(limits))
[docs]def parseCmdLine(): from argparse import ArgumentParser parser = ArgumentParser( description="Updates existing values min/max items in a referenced" " table or RD.") parser.add_argument("-t", "--tables-only", dest="tablesOnly", action="store_true", help="Only acquire table/resource-level metadata (rather than column" " metadata, which usually takes a lot longer).") parser.add_argument("itemId", help="Cross-RD reference of a table or" " RD to update, as in ds/q or ds/q#mytable; only RDs in inputsDir" " can be updated.") return parser.parse_args()
[docs]def main(): from gavo import api args = parseCmdLine() item = api.getReferencedElement(args.itemId) if isinstance(item, api.TableDef): changes = iterLimitsForTable(item, args.tablesOnly) rd = item.rd elif isinstance(item, api.RD): changes = iterLimitsForRD(item, args.tablesOnly) rd = item else: raise base.ReportableError( "%s references neither an RD nor a table definition"%args.itemId) newText = getChangedRD(rd.sourceId, changes) destFName = os.path.join( api.getConfig("inputsDir"), rd.sourceId+".rd") with utils.safeReplaced(destFName) as f: f.write(newText.encode("utf-8"))