Source code for gavo.grammars.common

"""
Base classes and common code for grammars.

NOTE: If you add grammars, you have to enter manually them in
rscdef.builtingrammars.GRAMMAR_REGISTRY (we don't want to import all
the mess in this package just to make that).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import codecs
import io
import re
import os
import select
import subprocess

from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rscdef import procdef
from gavo.rscdef import rowtriggers


[docs]class REAttribute(base.UnicodeAttribute): """is an attribute containing (compiled) RE """
[docs] def parse(self, value): if value is None or not value: return None try: return re.compile(value) except re.error as msg: raise base.ui.logOldExc(base.LiteralParseError(self.name_, value, hint="A python regular expression was expected here. Compile" " complained: %s"%str(msg)))
[docs] def unparse(self, value): if value is None: return "" else: return value.pattern
[docs]class FilteredInputFile(object): """a pseudo-file that allows piping data through a shell command. It supports read, readline, and close. Close closes the original file, too. Warning: the command passed in will be shell-expanded (which is fair since you can pass in any command you want anyway). If you pass silent=True, stderr will be redirected to /dev/null. This is probably only attractive for unit tests and such. """ def __init__(self, filterCommand, origFile, silent=False): self.filterCommand, self.origFile = filterCommand, origFile processArgs = {"shell": True, "stdin": subprocess.PIPE, "stdout": subprocess.PIPE, "close_fds": True} if silent: processArgs["stderr"] = utils.devnull() self.process = subprocess.Popen([self.filterCommand], **processArgs) self.fromChild, self.toChild = self.process.stdout, self.process.stdin self.buffer = utils.StreamBuffer(2**14) self.PIPE_BUF = select.PIPE_BUF origClose = self.origFile.close self.origFile.close = self._makeCloser(origClose) def _makeCloser(self, origClose): def _(): origClose() if self.process.returncode is None: self.process.terminate() # not checking the return code here; if the client closed # while the child was still running, an error from it would # not count as an error. self.process.wait() if self.fromChild: self.fromChild.close() if self.toChild: self.toChild.close() return _ def _fillChildBuffer(self): """feeds the child new data, closing the source file if all data has been fed. """ data = self.origFile.read(self.PIPE_BUF) if data==b"": # source file exhausted, tell child we're done self.toChild.close() self.toChild = None else: self.toChild.write(data) def _readAll(self): allChunks = [] while True: data = self.read(2**20) if data==b"": break allChunks.append(data) return b"".join(allChunks) def _fillBuffer(self): """tries to obtain another chunk of data from the child, feeding it new data if possible. """ if self.toChild: writeList = [self.toChild] else: writeList = [] readReady, writeReady, _ = select.select( [self.fromChild], writeList, []) if writeReady: self._fillChildBuffer() if readReady: data = self.fromChild.read(self.PIPE_BUF) if data==b"": self.buffer.doneWriting() if self.process.wait(): raise IOError("Child exited with return code %s"% self.process.wait()) self.buffer.add(data)
[docs] def read(self, nBytes=None): if nBytes is None: return self._readAll() while self.buffer.curSize<nBytes and not self.buffer.finished: self._fillBuffer() return self.buffer.get(nBytes) or b""
[docs] def readline(self): # let's do a quick one ourselves rather than inherit from file # just for this; this just works with unix line ends. while True: val = self.buffer.getToChar(b"\n") if val is None: if self.buffer.finished: return self.buffer.getRest() else: self._fillBuffer() else: return val
def __iter__(self): while True: res = self.readline() if not res: break yield res
[docs] def close(self): self.origFile.close()
[docs]class Rowfilter(procdef.ProcApp): """A generator for rows coming from a grammar. Rowfilters receive rows (i.e., dictionaries) as yielded by a grammar under the name row. Additionally, the embedding row iterator is available under the name rowIter. Macros are expanded within the embedding grammar. The procedure definition *must* result in a generator, i.e., there must be at least one yield; in general, this will typically be a ``yield row``, but a rowfilter may swallow or create as many rows as desired. If you forget to have a yield in the rowfilter source, you'll get a "NoneType is not iterable" error that's a bit hard to understand. Here, you can only access whatever comes from the grammar. You can access grammar keys in late parameters as row[key] or, if key is like an identifier, as @key. """ name_ = "rowfilter" requiredType="rowfilter" formalArgs = "row, rowIter"
[docs] def getFuncCode(self): return rscdef.replaceProcDefAt(procdef.ProcApp.getFuncCode(self), "row")
[docs]def compileRowfilter(filters): """returns an iterator that "pipes" the rowfilters in filters. This means that the output of filters[0] is used as arguments to filters[1] and so on. If filters is empty, None is returned. """ if not filters: return iters = [f.compile() for f in filters] #noflake: code gen src = [ "def iterPipe(row, rowIter):", " for item0 in iters[0](row, rowIter):"] for ind in range(1, len(filters)): src.append("%s for item%d in iters[%d](item%d, rowIter):"%( " "*ind, ind, ind, ind-1)) src.append("%s yield item%d"%(" "*len(filters), len(filters)-1)) d = locals() exec("\n".join(src), d) return d["iterPipe"]
[docs]class SourceFieldApp(rscdef.ProcApp): """A procedure application that returns a dictionary added to all incoming rows. Use this to programmatically provide information that can be computed once but that is then added to all rows coming from a single source, usually a file. This could be useful to add information on the source of a record or the like. The code must return a dictionary. The source that is about to be parsed is passed in as sourceToken. When parsing from files, this simply is the file name. The data the rows will be delivered to is available as "data", which is useful for adding or retrieving meta information. """ name_ = "sourceFields" requriedType = "sourceFields" formalArgs = "sourceToken, data"
[docs]class MapKeys(base.Structure): """Mapping of names, specified in long or short forms. mapKeys is necessary in grammars like keyValueGrammar or fitsProdGrammar. In these, the source files themselves give key names. Within the GAVO DC, keys are required to be valid python identifiers (i.e., match ``[A-Za-z\_][A-Za-z\_0-9]*``). If keys coming in do not have this form, mapping can force proper names. mapKeys could also be used to make incoming names more suitable for matching with shell patterns (like in rowmaker idmaps). """ name_ = "mapKeys" _content = base.DataContent(description="Simple mappings in the form" "<dest>:<src>{,<dest>:<src>}") _mappings = base.DictAttribute("maps", keyName="dest", description= "Map source names given in content to the name given in dest.", itemAttD=base.UnicodeAttribute("map"), inverted=True, copyable=True) def _parseShortenedMap(self, literal): try: for dest, src in (p.split(":") for p in literal.split(",")): src = src.strip() if src not in self.maps: self.maps[src] = dest.strip() else: raise base.StructureError( "%s clobbers an existing source within the key map."%src) except ValueError: raise base.ui.logOldExc(base.LiteralParseError(self.name_, literal, hint="A key-value enumeration of the format k:v {,k:v}" " is expected here"))
[docs] def onElementComplete(self): if self.content_: self._parseShortenedMap(self.content_) super().onElementComplete()
[docs] def doMap(self, aDict): """returns dict with the keys mapped according to the defined mappings. """ if self.maps: newDict = {} for k, v in aDict.items(): newDict[self.maps.get(k, k)] = v return newDict else: return aDict
[docs]class RowIterator(object): """An object that encapsulates the a source being parsed by a grammar. RowIterators are returned by Grammars' parse methods. Iterate over them to retrieve the rows contained in the source. You can also call getParameters on them to retrieve document-global values (e.g., the parameters of a VOTable, a global header of a FITS table). The getLocator method should return some string that aids the user in finding out why something went wrong (file name, line number, etc.) This default implementation works for when source is a sequence of dictionaries. You will, in general, want to override _iteRows and getLocator, plus probably __init__ (to prepare external resources) and getParameters (if you have them; make sure to update any parameters you have with self.sourceRow as shown in the default getParameters implementation). RowIterators are supposed to be self-destructing, i.e., they should release any external resources they hold when _iterRows runs out of items. _iterRows should arrange for the instance variable recNo to be incremented by one for each item returned. """ notify = True def __init__(self, grammar, sourceToken, sourceRow=None): self.grammar, self.sourceToken = grammar, sourceToken self.sourceRow = sourceRow self.recNo = 0 def __iter__(self): if self.notify: base.ui.notifyNewSource(self.sourceToken) if hasattr(self, "rowfilter"): baseIter = self._iterRowsProcessed() else: baseIter = self._iterRows() if self.grammar.ignoreOn: rowSource = self._filteredIter(baseIter) else: rowSource = baseIter try: try: for row in rowSource: # handle dispatched grammars here, too if isinstance(row, tuple): d = row[1] else: d = row if isinstance(d, dict): # else it could be a sentinel like FLUSH, which we leave alone if self.sourceRow: d.update(self.sourceRow) d["parser_"] = self yield row except Exception: base.ui.notifySourceError() raise finally: self.finalize()
[docs] def finalize(self): if self.notify: base.ui.notifySourceFinished()
def _filteredIter(self, baseIter): for row in baseIter: if not self.grammar.ignoreOn(row): yield row def _iterRowsProcessed(self): if self.grammar.isDispatching: for dest, row in self._iterRows(): for procRow in self.rowfilter(row, self): yield dest, procRow else: for row in self._iterRows(): for procRow in self.rowfilter(row, self): yield procRow def _iterRows(self): if False: yield None self.grammar = None # don't wait for garbage collection
[docs] def getParameters(self): res = {"parser_": self} if self.sourceRow: res.update(self.sourceRow) return res
[docs] def getLocator(self): return "(unknown position -- locator missing)"
[docs]def wrapFileFor(fileobj, desiredMode, enc): """wraps or unwraps fileobj so that it matches the open mode desiredMode. If there's a "b" in desiredMode, this will return fileobj.raw if it's there. Otherwise, it'll wrap it into a codec.getreader for enc. """ # I guess there's no good way to figure out if fileobj is binary or # text; so let's have a bit of heuristics. if hasattr(fileobj, "mode"): foundMode = fileobj.mode elif isinstance(fileobj, io.BytesIO): foundMode = "rb" elif isinstance(fileobj, io.StringIO): foundMode = "r" else: foundMode = "r" if hasattr(fileobj, "raw") else "rb" if "b" in desiredMode: if "b" in foundMode: return fileobj else: return fileobj.raw else: if "b" in foundMode: return codecs.getreader(enc)(fileobj) else: return fileobj
[docs]class FileRowIterator(RowIterator): """is a RowIterator base for RowIterators reading files. It analyzes the sourceToken to see if it's a string, in which case it opens it as a file name and leaves the file object in self.inputFile. Otherwise, it assumes sourceToken already is a file object and binds it to self.inputFile. It then tries to come up with a sensible designation for sourceToken. It also inspects the parent grammar for a gunzip attribute. If it is present and true, the input file will be unzipped transparently. Don't add more features like this; preFilter is a lot more flexible. Classes using this reading binary data will want to set fileMode to rb. If they don't what's returned is strings. """ fileMode = "r" def __init__(self, grammar, sourceToken, **kwargs): RowIterator.__init__(self, grammar, sourceToken, **kwargs) self.curLine = 1 try: self._openFile() except IOError as ex: raise base.ui.logOldExc( base.SourceParseError("I/O operation failed (%s)"%str(ex), source=str(sourceToken), location="start")) def _openFile(self): preFilter = None # we'll use at the end to generate a sourceToken for display purposes parsingFrom = self.sourceToken if hasattr(parsingFrom, "name"): # it' probably an open file parsingFrom = self.sourceToken.name if not isinstance(parsingFrom, str): parsingFrom = repr(parsingFrom) if hasattr(self.grammar, "gunzip") and self.grammar.gunzip: preFilter = "zcat" preFilter = preFilter or ( hasattr(self.grammar, "preFilter") and self.grammar.preFilter) # need to handle preFilter first, as that needs a binary file. curSrc = self.sourceToken if preFilter: if isinstance(curSrc, str): curSrc = open(curSrc, "rb") else: curSrc = curSrc curSrc = FilteredInputFile(preFilter, curSrc) if isinstance(curSrc, str): curSrc = open(curSrc, "rb") # now curSrc is a binary file. If a normal file mode # is requested, wrap it into a codec. if "b" not in self.fileMode: curSrc = wrapFileFor( curSrc, self.fileMode, self.grammar.enc or "ascii") self.inputFile = curSrc self.sourceToken = parsingFrom
[docs] def finalize(self): RowIterator.finalize(self) if hasattr(self.inputFile, "close"): self.inputFile.close()
[docs]class FileRowAttributes(base.StructCallbacks): """A mixin for grammars with FileRowIterators. This provides some attributes that FileRowIterators interpret, e.g., preFilter. """ _gunzip = base.BooleanAttribute("gunzip", description="Unzip sources" " while reading? (Deprecated, use preFilter='zcat')", default=False) _preFilter = base.UnicodeAttribute("preFilter", description="Shell" " command to pipe the input through before passing it on to the" " grammar. Classical examples include zcat or bzcat, but you" " can commit arbitrary shell atrocities here.", copyable=True)
[docs] def completeElement(self, ctx): if ctx.restricted: if self.preFilter is not None: raise base.RestrictedElement("preFilter") super().completeElement(ctx)
[docs]class GrammarMacroMixin(base.StandardMacroMixin): """A collection of macros available to rowfilters. NOTE: All macros should return only one single physical python line, or they will mess up the calculation of what constructs caused errors. """
[docs] def macro_inputRelativePath(self, liberalChars="True"): """returns an expression giving the current source's path relative to inputsDir liberalChars can be a boolean literal (True, False, etc); if false, a value error is raised if characters that will result in trouble with the product mixin are within the result path. In rowmakers fed by grammars with //products#define, better use @prodtblAccref. """ return ('utils.getRelativePath(rowIter.sourceToken,' ' base.getConfig("inputsDir"), liberalChars=%s)'%( base.parseBooleanLiteral(liberalChars)))
[docs] def macro_fullDLURL(self, dlService): r"""returns a python expression giving a link to the full current data set retrieved through the datalink service. You would write \fullDLURL{dlsvc} here, and the macro will expand into something like http://yourserver/currd/dlsvc/dlget?ID=ivo://whatever. dlService is the id of the datalink service in the current RD. This is intended for "virtual" data where the dataset is generated on the fly through datalink. """ baseURL = self.rd.getById(dlService).getURL("dlget") return ("'%%s?ID=%%s'%%(%s," " urllib.parse.quote_plus(getStandardPubDID(rowIter.sourceToken)))"%( repr(baseURL)))
[docs] def macro_dlMetaURI(self, dlService): r"""like fullDLURL, except it points to the datalink metadata. This is intended for binding to //products#define's datalink parameter. If you need the value in a rowmaker, grab it from @prodtblDatalink. """ baseURL = self.rd.getById(dlService).getURL("dlmeta") return ("'%%s?ID=%%s'%%(%s," " urllib.parse.quote_plus(getStandardPubDID(rowIter.sourceToken)))"%( repr(baseURL)))
[docs] def macro_standardPreviewPath(self): """returns an expression for the standard path for a custom preview. This consists of resdir, the name of the previewDir property on the embedding DD, and the flat name of the accref (which this macro assumes to see in its namespace as accref; this is usually the case in //products#define, which is where this macro would typically be used). As an alternative, there is the splitPreviewPath macro, which does not mogrify the file name. In particular, do not use standardPreviewPath when you have more than a few 1e4 files, as it will have all these files in a single, flat directory, and that can become a chore. See the introduction to custom previews for details. """ constantPrefix = os.path.join( rscdef.getInputsRelativePath(self.parent.rd.resdir), self.parent.getProperty("previewDir"))+"/" return (repr(constantPrefix) +"+getFlatName(accref)")
[docs] def macro_splitPreviewPath(self, ext): """returns an expression for the split standard path for a custom preview. As standardPreviewPath, except that the directory hierarchy of the data files will be reproduced in previews. For ext, you should typically pass the extension appropriate for the preview (like {.png} or {.jpeg}). See the introduction to custom previews for details. """ constantPrefix = os.path.join( rscdef.getInputsRelativePath(self.parent.rd.resdir), self.parent.getProperty("previewDir"))+"/" return (repr(constantPrefix) +"+accref+'%s'"%ext)
[docs] def macro_sourceDate(self): """returns an expression giving the timestamp of the current source. """ return ('datetime.datetime.utcfromtimestamp(' 'os.path.getmtime(rowIter.sourceToken))')
[docs] def macro_srcstem(self): """returns python code for the stem of the source file currently parsed in a rowmaker. Example: if you're currently parsing /tmp/foo.bar, the stem is foo. """ return 'getFileStem(rowIter.sourceToken)'
[docs] def macro_rootlessPath(self): """returns an expression giving the current source's path with the resource descriptor's root removed. """ return ('utils.getRelativePath(rowIter.sourceToken,' ' rowIter.grammar.rd.resdir)')
[docs] def macro_inputSize(self): """returns an expression giving the size of the current source. """ return 'os.path.getsize(rowIter.sourceToken)'
[docs] def macro_colNames(self, tableRef): """returns a comma-separated list of column names for a table reference. This is convenient if an input file matches the table structure; you can then simply say things like <reGrammar names="\\\\colName{someTable}"/>. """ return ",".join(c.name for c in self.rd.getById(tableRef))
[docs] def macro_property(self, property): """returns the value of property on the parent DD. """ return self.parent.getProperty(property)
[docs]class Grammar(base.Structure, GrammarMacroMixin): """An abstract grammar. Grammars are configured via their structure parameters. Their parse(sourceToken) method returns an object that iterates over rawdicts (dictionaries mapping keys to (typically) strings) that can then be fed through rowmakers; it also has a method getParameters that returns global properties of the whole document (like parameters in VOTables; this will be empty for many kinds of grammars). RowIterators will return a reference to themselves in the raw dicts in the parser_ key unless you override their _iterRowsProcessed method (which you shouldn't). This is used by rowmaker macros. What exactly sourceToken is is up to the concrete grammar. While typically it's a file name, it might be a sequence of dictionaries, a twisted web request, or whatever. To derive a concrete Grammar, define a RowIterator for your source and set the rowIterator class attribute to it. """ name_ = "grammar" _encoding = base.UnicodeAttribute("enc", default=None, description= "Encoding of the source file(s).", copyable=True) _rowfilters = base.StructListAttribute("rowfilters", description="Row filters for this grammar.", childFactory=Rowfilter, copyable=True) _ignoreOn = base.StructAttribute("ignoreOn", default=None, copyable=True, description="Conditions for ignoring certain input records. These" " triggers drop an input record entirely. In modern RDs, prefer" " rowfilters raising SkipThis.", childFactory=rowtriggers.IgnoreOn) _sourceFields = base.StructAttribute("sourceFields", default=None, copyable=True, description="Code returning a dictionary of values" " added to all returned rows.", childFactory=SourceFieldApp) _properties = base.PropertyAttribute(copyable=True) _original = base.OriginalAttribute() _rd = rscdef.RDAttribute() # isDispatching is used by various special grammars to signify the # grammar returns rowdicts for multiple makers. See those. # Here, we just fix it to false so clients can rely on the attribute's # existence. isDispatching = False rowIterator = RowIterator
[docs] def getSourceFields(self, sourceToken, data): """returns a dict containing user-defined fields to be added to all results. """ if self.sourceFields is None: return None if not hasattr(self, "_compiledSourceFields"): self._compiledSourceFields = self.sourceFields.compile() return self._compiledSourceFields(sourceToken, data)
[docs] def parse(self, sourceToken, targetData=None): ri = self.rowIterator(self, sourceToken, sourceRow=self.getSourceFields(sourceToken, targetData)) if self.rowfilters: ri.rowfilter = compileRowfilter(self.rowfilters) return ri
[docs]class NullGrammar(Grammar): """A grammar that never returns any rows. """ name_ = "nullGrammar"
[docs]class TransparentGrammar(Grammar): """A grammar that returns its sourceToken as the row iterator. This only makes sense in extreme situations and never without custom code. If you're not sure you need this, you don't want to know about it. """ name_ = "transparentGrammar"
[docs] def parse(self, sourceToken, targetData=None): return sourceToken