Source code for gavo.votable.dec_tabledata

"""
Coding and decoding from tabledata.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import re #noflake: used by generated code

from gavo import utils #noflake: used by generated code
from gavo.utils import parseDefaultDatetime, parseDefaultDate #noflake: used by generated code
from gavo.utils import pgsphere #noflake: used by generated code
from gavo.votable import coding
from gavo.votable import common
from gavo.votable.model import VOTable

try:
	from gavo import stc  #noflake: used by generated code
except ImportError:
	# see modelgroups
	pass


# literals for TDENC booleans
TDENCBOOL = {
	't': True,
	'1': True,
	'true': True,
	'f': False,
	'0': False,
	'false': False,
	'?': None,
	'': None,
}


[docs]def tokenizeComplexArr(val): """iterates over suitable number literal pairs from val. """ last = None if val is None: return for item in val.split(): if not item: continue if last is None: last = item else: yield "%s %s"%(last, item) last = None if last: yield last
[docs]def tokenizeBitArr(val): """iterates over 0 or 1 tokens in val, discarding everything else. """ if val is None: return for item in val: if item in "01": yield item
[docs]def tokenizeNormalArr(val): """iterates over all whitespace-separated tokens in val """ if val is None: return for item in val.split(): if item: yield item
def _addNullvalueCode(field, src, validator): """adds code to catch nullvalues if required by field. """ nullvalue = coding.getNullvalue(field, validator) if nullvalue is not None: src = [ 'if val=="%s":'%nullvalue, ' row.append(None)', 'else:']+common.indentList(src, " ") return src def _makeFloatDecoder(field): src = [ 'if not val or val=="NaN":', ' row.append(None)', 'else:', ' row.append(float(val))',] return _addNullvalueCode(field, src, float) def _makeComplexDecoder(field): src = [ 'if not val:', ' row.append(None)', 'else:', ' try:', ' r, i = val.split()', ' except ValueError:', ' r, i = float(val), 0', ' if r!=r or i!=i:', ' row.append(None)', ' else:' ' row.append(complex(float(r), float(i)))',] return _addNullvalueCode(field, src, common.validateTDComplex) def _makeIntDecoder(field, maxInt): src = [ 'if not val:', ' row.append(None)', 'elif val.startswith("0x"):', ' unsigned = int(val[2:], 16)', # Python hex parsing is unsigned, fix manually based on maxInt ' if unsigned>=%d:'%maxInt, ' row.append(unsigned-%d)'%((maxInt+1)*2), ' else:', ' row.append(unsigned)', 'else:', ' row.append(int(val))'] return _addNullvalueCode(field, src, common.validateVOTInt) def _makeCharDecoder(field, emptyIsNull=True, fallbackEncoding="ascii"): """parseBytes enables return of empty string (as opposed to None). """ # Elementtree makes sure we're only seeing unicode strings here # We simply pass these on -- while this makes us accept more than # we should, in practice returning bytes here would cause a lot # more headache. Ask astropy users. src = [] if emptyIsNull: src.extend([ 'if not val:', ' val = None',]) else: src.extend([ 'if val is None:', ' val = ""']) nullvalue = coding.getNullvalue(field, str, "") if nullvalue: src.extend([ 'if val==%s:'%repr(nullvalue), ' val = None',]) if field.isMultiDim(): src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize)) xtypeDecoder = common.getXtypeDecoderCode(field) if xtypeDecoder: src.extend(xtypeDecoder) src.append("row.append(val)") return src def _makeUnicodeDecoder(field, emptyIsNull=True): return _makeCharDecoder(field, emptyIsNull, fallbackEncoding=None) def _makeBooleanDecoder(field): return ['row.append(TDENCBOOL[val.strip().lower()])'] def _makeBitDecoder(field): return ['row.append(int(val))'] _decoders = { 'boolean': (_makeBooleanDecoder, 'list'), 'bit': (_makeBitDecoder, 'list'), 'unsignedByte': (lambda v: _makeIntDecoder(v, 256), 'bytelist'), 'char': (_makeCharDecoder, 'list'), 'unicodeChar': (_makeUnicodeDecoder, 'list'), 'short': (lambda v: _makeIntDecoder(v, 32767), 'intlist'), 'int': (lambda v: _makeIntDecoder(v, 2147483647), 'intlist'), 'long': (lambda v: _makeIntDecoder(v, 9223372036854775807), 'intlist'), 'float': (_makeFloatDecoder, 'floatlist'), 'double': (_makeFloatDecoder, 'floatlist'), 'floatComplex': (_makeComplexDecoder, 'complexlist'), 'doubleComplex': (_makeComplexDecoder, 'complexlist'), } def _getArrayDecoderLines(field): """returns lines that decode arrays of literals. Unfortunately, the spec is plain nuts, so we need to pull some tricks here. As per VOTable 1.3, we translate empty strings to Nones; we use the liberty that empty and NULL arrays are not distinguished to return empty arrays as empty arrays, though. """ # TODO: this now contains enough generic code to marry it with the # respective function in dec_binary (and move the result to coding.py). type = field.datatype if type=='char': return _makeCharDecoder(field, emptyIsNull=True) elif type=='unicodeChar': return _makeUnicodeDecoder(field, emptyIsNull=True) decoder, listtype = _decoders[type] src = [ # OMG. I'm still hellbent on not calling functions here. 'arrayLiteral = val', 'fullRow, row = row, []', ] if type=='floatComplex' or type=='doubleComplex': src.append("for val in tokenizeComplexArr(arrayLiteral):") elif type=='bit': src.append("for val in tokenizeBitArr(arrayLiteral):") else: src.append("for val in tokenizeNormalArr(arrayLiteral):") src.extend(common.indentList(decoder(field), " ")) src.extend(coding.makeShapeValidator(field)) src.extend([ "val = utils.%s(row)"%listtype, "row = fullRow"]) src.append("val = coding.unravelArray(%s, val)"%repr(field.arraysize)) src.extend(common.getXtypeDecoderCode(field)) src.extend([ "row.append(val)"]) return [ "if val=='':", " row.append(None)", "else:"]+common.indentList(src, " ")
[docs]def getLinesFor(field): """returns a sequence of python source lines to decode TABLEDATA-encoded values for field. """ if field.isScalar(): return _decoders[field.datatype][0](field) else: return _getArrayDecoderLines(field)
[docs]def getRowDecoderSource(tableDefinition): """returns the source for a function deserializing rows of tableDefition in TABLEDATA. tableDefinition is a VOTable.TABLE instance. """ source = ["def codec(rawRow):", " row = []"] for index, field in enumerate( tableDefinition.iterChildrenOfType(VOTable.FIELD)): source.extend([ " try:", " val = rawRow[%d]"%index,]+ common.indentList(getLinesFor(field), " ")+[ " except common.VOTableError:", " raise", " except Exception as ex:", # " import traceback; traceback.print_exc()", " raise common.BadVOTableLiteral('%s', val, ex, name=%r)"%( field.datatype, field.name)]) source.append(" return row") return "\n".join(source) return source
[docs]def getGlobals(tableDefinition): return globals()