Source code for gavo.votable.coding

"""
Common code for coding and decoding VOTable data.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import utils
from gavo.votable import common
from gavo.votable.model import VOTable
from functools import reduce


[docs]def getRowEncoderSource(tableDefinition, encoderModule): """returns the source for a function encoding rows of tableDefition in the format implied encoderModule tableDefinition is a VOTable.TABLE instance, encoderModule is one of the enc_whatever modules (this function needs getLinesFor and getPostamble from them). """ source = [ "def codec(tableRow):", " tokens = []", " val = None"] source.extend( common.indentList( getattr(encoderModule, "getPreamble", lambda td: [])( tableDefinition), " ")) for index, field in enumerate( tableDefinition.iterChildrenOfType(VOTable.FIELD)): source.extend([ " try:", " val = tableRow[%d]"%index]) source.extend(common.indentList(encoderModule.getLinesFor(field), " ")) source.extend([ " except common.VOTableError:", " raise", " except Exception as ex:", # " import traceback; traceback.print_exc()", " raise common.BadVOTableData(str(ex), repr(val), '%s')"% field.getDesignation()]) source.extend(common.indentList( encoderModule.getPostamble(tableDefinition), " ")) return "\n".join(source)
[docs]def buildCodec(source, env): """returns a compiled function for source in env. Source is the result of one of the makeXXX functions in this module, env typically the result of a getGlobals() on the codec module. """ ns = {} ns.update(env) # with open("codec.py", "wb") as f: f.write(source.encode("utf-8")) return utils.compileFunction(source, "codec", useGlobals=ns)
[docs]def buildEncoder(tableDefinition, encoderModule): return buildCodec( getRowEncoderSource(tableDefinition, encoderModule), encoderModule.getGlobals(tableDefinition))
[docs]def buildDecoder(tableDefinition, decoderModule): return buildCodec( decoderModule.getRowDecoderSource(tableDefinition), decoderModule.getGlobals(tableDefinition))
[docs]def getNullvalue(field, validator, default=None): """returns None or the nullvalue defined for field. validator is a function that raises some exception if the nullvalue is inappropriate. It should do so in particular on everything that contains quotes and such; the nullvalues are included in source code and thus might be used to inject code if not validated. """ nullvalue = None for values in field.iterChildrenOfType(VOTable.VALUES): if values.null is not None: nullvalue = values.null if nullvalue is None or nullvalue=='': return default else: validator(nullvalue) return nullvalue
[docs]def unravelArray(arraysize, seq): """turns a flat sequence into an n-dim array as specified by the votable arraysize spec arraysize. arraysize is <int>{"x"<int>}*?|*. No padding or cropping will take place. This means that the last row(s) may have improper sizes if seq is incompatible with arraysize. >>> unravelArray("2x3", "012345") ['01', '23', '45'] >>> unravelArray("2x*", "012345") ['01', '23', '45'] >>> unravelArray("3x2x*", "012345012345") [['012', '345'], ['012', '345']] """ parts = arraysize.split("x") if len(parts)<2: return seq del parts[-1] # this is so we preserve utils.intlist and friends. listCons = list if isinstance(seq, list): listCons = seq.__class__ for step in map(int, parts): seq = listCons(seq[i:i+step] for i in range(0, len(seq), step)) return seq
[docs]def parseVOTableArraysizeEl(spec, fieldName): """parses a single VOTable arraysize number to (flexible, length). This will accept single numbers (returns False, number), number* (returns True, number) and just * (returns 0, number). This is used to parse the last part of an n-d array spec. Everything before that must be an integer only. """ try: if spec=="*": return True, 0 elif spec.endswith("*"): return True, int(spec[:-1]) else: return False, int(spec) except ValueError: raise common.VOTableError("Invalid arraysize fragment '%s' in" " field or param name '%s'"%(spec, fieldName))
[docs]def makeShapeValidator(field): """returns code lines to validate an an array shape against a flat sequence in row. This is used by the array decoders. """ arraysize = field.arraysize if not arraysize: return [] dimensions = arraysize.strip().split("x") stride = 1 # all dimensions except the last must be integers if len(dimensions)>1: try: stride = reduce(lambda a,b: a*b, [int(l) for l in dimensions[:-1]]) except ValueError: raise common.VOTableError("Invalid arraysize '%s' specified in" " field or param name '%s'"%( field.arraysize, field.name)) flexible, length = parseVOTableArraysizeEl(dimensions[-1], field.name) if flexible: # 0..n; all we have to do is check that the length is a multiple of # stride, if that's non-trivial. # TODO: enfoce length limits? By error or by cropping? if stride>1: return [ "if len(row) %% %d:"%stride, " raise common.BadVOTableLiteral('%s[%s]'," " '<%%d token(s)>'%%(len(row)), name=%r)"%( field.datatype, field.arraysize, field.name)] else: # exact size specification return [ "if len(row)!=%d:"%(length*stride), " raise common.BadVOTableLiteral('%s[%s]'," " '<%%d token(s)>'%%(len(row)), name=%r)"%( field.datatype, field.arraysize, field.name)] # fallback: no validation return []
[docs]def ravel(seq): """expands flattens out any sub-sequences (lists or tuples) in seq recursively. This is used by the array encoders. """ res = [] iteratorStack = [iter(seq)] while iteratorStack: try: item = next(iteratorStack[-1]) if isinstance(item, (list, tuple)): iteratorStack.append(iter(item)) # continue iterating from the current item else: res.append(item) except StopIteration: iteratorStack.pop() return res
[docs]def trim(seq, arraysize, padder): """returns seq with length arraysize. arraysize is an int; you should just use field.getLength() when trimming VOTable arraysizes since the arraysize attribute is rather complex. Arraysize may be None for convenience; trim is a no-op then. If seq is shorter, padder*missing will be appended, if it is longer, seq will be shortened from the end. This is intended as a helper for array encoders. """ seq = ravel(seq) if arraysize is None: return seq if len(seq)<arraysize: seq = seq+padder*(arraysize-len(seq)) elif len(seq)>arraysize: seq = seq[:arraysize] return seq
[docs]def trimString(val, arraysize, padChar=" "): """returns val flattened and padded with padChar/cropped to length. field is a V.FIELD or V.PARAM instance for which val should be prepared. val can also be a sequence of strings (or nested more deeply). In that case, trimString will flatten the value(s), padding and cropping as necessary. If val is None, then as many padChars will be returned as arraysize wants (which is 0 for variable-length fields). trimString expects to deal with strings. It will ascii-decode bytes if it sees them, though. For chars, arraysize None is equivalent to arraysize 1. >>> trimString("abc", "4") 'abc ' >>> trimString(["abc", "de", "f"], "2x*") 'abdef ' >>> trimString([["abc", "cd", "e"], ["", "fgh", "i"]], "2x4x3") 'abcde fgi ' >>> trimString(None, "4x2", 'z') 'zzzzzzzz' >>> trimString(None, "4x2*", 'z') '' >>> trimString("abc", None) 'a' >>> trimString(b"abc", "5", "x") 'abcxx' """ if arraysize is None: arraysize = "1" if val is None: expected = common.getLength(arraysize) if expected: return padChar*expected else: return "" if isinstance(val, bytes): val = val.decode("ascii") if "x" in arraysize: rest, destLength = arraysize.rsplit("x", 1) if not destLength.endswith('*'): destLength = int(destLength) val = val[:destLength]+[None]*max(0, destLength-len(val)) return "".join(trimString(item, rest, padChar) for item in val) else: if arraysize.endswith('*'): return val else: destLength = int(arraysize) return val[:destLength]+padChar*max(0, destLength-len(val))
if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()