Source code for gavo.votable.coding

"""
Common code for coding and decoding VOTable data.
"""

#c Copyright 2008-2025, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import re

from gavo import utils
from gavo.votable import common
from gavo.votable.model import VOTable
from functools import reduce


from gavo.utils.dachstypes import (cast,
	Any, Callable, Dict, List, Optional, ModuleType, Sequence, Tuple, Union)

SequenceForArray = Union[List[Optional[int]], List[Optional[float]]]


[docs]def getRowEncoderSource( tableDefinition: VOTable.TABLE, encoderModule: ModuleType) -> str: """returns the source for a function encoding rows of tableDefition in the format implied encoderModule tableDefinition is a VOTable.TABLE instance, encoderModule is one of the enc_whatever modules (this function needs getLinesFor and getPostamble from them). """ source = [ "def codec(tableRow):", " tokens = []", " val = None"] source.extend( common.indentList( cast(List[str], getattr(encoderModule, "getPreamble", lambda td: [])( tableDefinition)), " ")) for index, field in enumerate( tableDefinition.iterChildrenOfType(VOTable.FIELD)): source.extend([ " try:", " val = tableRow[%d]"%index]) source.extend(common.indentList(encoderModule.getLinesFor(field), " ")) source.extend([ " except common.VOTableError:", " raise", " except Exception as ex:", # " import traceback; traceback.print_exc()", " raise common.BadVOTableData(str(ex), repr(val), '%s')"% field.getDesignation()]) source.extend(common.indentList( encoderModule.getPostamble(tableDefinition), " ")) return "\n".join(source)
[docs]def buildCodec(source: str, env: Dict[str, Any] ) -> Callable: """returns a compiled function for source in env. Source is the result of one of the makeXXX functions in this module, env typically the result of a getGlobals() on the codec module. """ ns = {} ns.update(env) # with open("codec.py", "wb") as f: f.write(source.encode("utf-8")) return utils.compileFunction(source, "codec", useGlobals=ns)
[docs]def buildEncoder(tableDefinition: VOTable.TABLE, encoderModule: ModuleType ) -> Callable[[Dict[str, Any]], bytes]: return buildCodec( getRowEncoderSource(tableDefinition, encoderModule), encoderModule.getGlobals(tableDefinition))
[docs]def buildDecoder(tableDefinition: VOTable.TABLE, decoderModule: ModuleType ) -> Callable[[bytes], Dict[str, Any]]: return buildCodec( decoderModule.getRowDecoderSource(tableDefinition), decoderModule.getGlobals(tableDefinition))
[docs]def getNullvalue( field: VOTable.TypedElement, validator: Callable[[Any], Any], default: Optional[str] = None) -> Optional[str]: """returns None or the nullvalue defined for field. validator is a function that raises some exception if the nullvalue is inappropriate. It should do so in particular on everything that contains quotes and such; the nullvalues are included in source code and thus might be used to inject code if not validated. """ nullvalue = None for values in field.iterChildrenOfType(VOTable.VALUES): if values.null is not None: nullvalue = values.null if nullvalue is None or nullvalue=='': return default else: validator(nullvalue) return nullvalue
[docs]def unravelArray( arraysize: str, seq: Sequence) -> Sequence: """turns a flat sequence into an n-dim array as specified by the votable arraysize spec arraysize. arraysize is <int>{"x"<int>}*?|*. No padding or cropping will take place. This means that the last row(s) may have improper sizes if seq is incompatible with arraysize. >>> unravelArray("2x3", "012345") ['01', '23', '45'] >>> unravelArray("2x*", "012345") ['01', '23', '45'] >>> unravelArray("3x2x*", "012345012345") [['012', '345'], ['012', '345']] """ parts = arraysize.split("x") if len(parts)<2: return seq del parts[-1] # this is so we preserve utils.intlist and friends. listCons = list if isinstance(seq, list): listCons = seq.__class__ # type: ignore # mypy confusion, I'd say for step in map(int, parts): seq = listCons(seq[i:i+step] for i in range(0, len(seq), step)) return seq
[docs]def parseVOTableArraysizeEl( spec: str, fieldName: Optional[str]) -> Tuple[bool, int]: """parses a single VOTable arraysize number to a pair of (flexible, length). This will accept single numbers (returns False, number), number* (returns True, number) and just * (returns 0, number). This is used to parse the last part of an n-d array spec. Everything before that must be an integer only. """ try: if spec=="*": return True, 0 elif spec.endswith("*"): return True, int(spec[:-1]) else: return False, int(spec) except ValueError: raise common.VOTableError("Invalid arraysize fragment '%s' in" " field or param name '%s'"%(spec, fieldName))
[docs]def makeShapeValidator(field: VOTable.TypedElement) -> List[str]: """returns code lines to validate an an array shape against a flat sequence in row. This is used by the array decoders. """ arraysize = field.arraysize if not arraysize: return [] dimensions = arraysize.strip().split("x") stride = 1 # all dimensions except the last must be integers if len(dimensions)>1: try: stride = reduce(lambda a,b: a*b, [int(l) for l in dimensions[:-1]]) except ValueError: raise common.VOTableError("Invalid arraysize '%s' specified in" " field or param name '%s'"%( field.arraysize, field.name)) flexible, length = parseVOTableArraysizeEl(dimensions[-1], field.name) if flexible: # 0..n; all we have to do is check that the length is a multiple of # stride, if that's non-trivial. # TODO: enfoce length limits? By error or by cropping? if stride>1: return [ "if len(row) %% %d:"%stride, " raise common.BadVOTableLiteral('%s[%s]'," " '<%%d token(s)>'%%(len(row)), name=%r)"%( field.datatype, field.arraysize, field.name)] else: # exact size specification return [ "if len(row)!=%d:"%(length*stride), " raise common.BadVOTableLiteral('%s[%s]'," " '<%%d token(s)>'%%(len(row)), name=%r)"%( field.datatype, field.arraysize, field.name)] # fallback: no validation return []
[docs]def ravel(seq: Sequence) -> SequenceForArray: """expands flattens out any sub-sequences (lists or tuples) in seq recursively. This is used by the array encoders. """ res = [] iteratorStack = [iter(seq)] while iteratorStack: try: item = next(iteratorStack[-1]) if isinstance(item, (list, tuple)): iteratorStack.append(iter(item)) # continue iterating from the current item else: res.append(item) except StopIteration: iteratorStack.pop() return res
[docs]def trim( seq: SequenceForArray, arraysize: Optional[int], padder: SequenceForArray) -> SequenceForArray: """returns seq with length arraysize. arraysize is an int; you should just use field.getLength() when trimming VOTable arraysizes since the arraysize attribute is rather complex. Arraysize may be None for convenience; trim is a no-op then. If seq is shorter, padder*missing will be appended, if it is longer, seq will be shortened from the end. This is intended as a helper for array encoders. """ seq = ravel(seq) if arraysize is None: return seq if len(seq)<arraysize: seq = seq+padder*(arraysize-len(seq)) # type: ignore # float/intlist elif len(seq)>arraysize: seq = seq[:arraysize] return seq
[docs]def trimString( val: Union[List[Optional[str]], Optional[str]], arraysize: str, padChar: str = " ") -> str: """returns val flattened and padded with padChar/cropped to length. field is a V.FIELD or V.PARAM instance for which val should be prepared. val can also be a sequence of strings (or nested more deeply). In that case, trimString will flatten the value(s), padding and cropping as necessary. If val is None, then as many padChars will be returned as arraysize wants (which is 0 for variable-length fields). trimString expects to deal with strings. It will ascii-decode bytes if it sees them, though. For chars, arraysize None is equivalent to arraysize 1. >>> trimString("abc", "4") 'abc ' >>> trimString(["abc", "de", "f"], "2x*") 'abdef ' >>> trimString([["abc", "cd", "e"], ["", "fgh", "i"]], "2x4x3") 'abcde fgi ' >>> trimString(None, "4x2", 'z') 'zzzzzzzz' >>> trimString(None, "4x2*", 'z') '' >>> trimString("abc", None) 'a' >>> trimString(b"abc", "5", "x") 'abcxx' """ if arraysize is None: arraysize = "1" if val is None: expected = common.getLength(arraysize) if expected: return padChar*expected else: return "" if isinstance(val, bytes): val = val.decode("ascii") if "x" in arraysize: # 2D: val is a list (we don't really do >2D for chars yet rest, destLength = arraysize.rsplit("x", 1) if not destLength.endswith('*'): if isinstance(val, list): nItems = int(destLength) val = val[:nItems]+[None]*max(0, nItems-len(val)) else: raise NotImplementedError(f"Cannot trim plain string to {arraysize}") return "".join(trimString(item, rest, padChar) for item in val) else: # 1D val: is a char if isinstance(val, str): if arraysize.endswith('*'): return val else: nItems = int(arraysize) return val[:nItems]+padChar*max(0, nItems-len(val)) else: raise NotImplementedError(f"Cannot trim non-str to {arraysize}")
def _toJSONEscape(mat: re.Match) -> str: """returns a json escape for a one-char re match. This is mostly ripped from json.encoder """ n = ord(mat.group()) if n < 0x10000: return '\\u{0:04x}'.format(n) else: raise common.BadVOTableData("Will not produce utf-8 surrogates", f"\\u{n}", "a JSON column") _ASCII_ESCAPE_PAT = re.compile("[^ -~]")
[docs]def forceASCII(s: Optional[str]) -> str: """returns s with all non-ascii replaced by json escapes. This is like json.encoder.encode_basestring_ascii, except we don't escape anything else under the assumption we already have json otherwise. >>> forceASCII("malformed ' [ json") "malformed ' [ json" >>> forceASCII("Krämerei") 'Kr\\\\u00e4merei' >>> forceASCII("🄁") Traceback (most recent call last): gavo.votable.common.BadVOTableData: Field 'a JSON column', value '\\\\u127233': Will not produce utf-8 surrogates """ if s is None: return "" return _ASCII_ESCAPE_PAT.sub(_toJSONEscape, s)
if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()