Source code for gavo.votable.common

"""
Common definitions for the GAVO VOTable modules.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import codecs
import functools

from gavo import utils


NaN = float("NaN")

[docs]class VOTableError(utils.Error): """The base class of VOTable-related errors. """
[docs]class BadVOTableLiteral(VOTableError): """Raised when a literal in a VOTable is invalid. """ def __init__(self, type, literal, hint=None, originalException=None, name=None): if name is None: name = "<Unknown>" VOTableError.__init__(self, "Invalid literal for %s (field %s): '%s'"%(type, name, repr(literal)), hint=hint) self.type, self.literal, self.name = type, literal, name self.originalException = originalException def __str__(self): return "Invalid literal for %s (field %s): %s"%( self.type, self.name, repr(self.literal))
[docs]class BadVOTableData(VOTableError): """Raised when something is wrong with a value being inserted into a VOTable. """ def __init__(self, msg, val, fieldName, hint=None): VOTableError.__init__(self, msg, hint=hint) self.fieldName, self.val = fieldName, repr(val) def __getstate__(self): return {"msg": self.msg, "val": self.val, "fieldName": self.fieldName} def __str__(self): return "Field '%s', value %s: %s"%(self.fieldName, self.val, self.msg)
[docs]class VOTableParseError(VOTableError): """Raised when something is grossly wrong with the document structure. Note that the message passed already contains line and position. I'd like to have them in separate attributes, but the expat library mashes them up. iterparse.getParseError is the canonical way of obtaining these when you have no positional information. """
[docs]def qmreplace(exc): """a dumb handler for decoder errors. This is like python's "replace" handler except that we'll always return question marks rather than ufffd. The latter makes sense in a unicode environment, but we need this for VOTable chars, and there that's just a nuisance. """ return '?', exc.start+1
codecs.register_error("qmreplace", qmreplace)
[docs]def validateTDComplex(val): re, im = list(map(float, val.split()))
[docs]def validateVOTInt(val): """raise an error if val is not a legal int for VOTables. Actually, this is for tabledata, and after the relaxed 1.3 rules, we allow the empty string ("NULL"), too. """ if val=="": return try: int(val[2:], 16) except ValueError: int(val)
[docs]def indentList(lines, indent): """prepens indent to all elements in lines. """ return [indent+l for l in lines]
[docs]def getLoopifier(field): """returns a function to map code over arrays. This is used by ``*XtypeEncoderCode`` functions below, and for now only deals with 1D arrays of xtyped things, which right now means 2D arrays of votable arrays. This will return a callable accepting a list of lines (the xtype decoder for an elementary thing), nor None if the array is too complex. """ loopify = lambda x: x # All xtyped things are 1D arrays so far. We're using this to decide # if we have to loop if field.isMultiDim(): if field.arraysize.count("x")==1: # 1-d array of xtyped thing; handle it def loopify(code): return [ "seq, arr = val, []", "for val in seq:", ]+indentList(code, " ")+[ " arr.append(val)", "val = arr"] else: # just forget it; if there are native objects in the value, it's fail, # but since the decoder operate the same way, roundtrip will work. return None return loopify
[docs]def getXtypeEncoderCode(field): """returns code that turns special internal representations for xtyped fields to what's serialised in VOTables. For None or unknown xtypes, this will return an empty list. Otherwise, it expects the value in a local variable val and will leave the transformed value there. This is currently only called for char and float arrays, as no xtypes are defined for other types. If that changes, you'll have to change the ``*_enc`` modules. This will handle 1D arrays of xtyped things but nothing more deeply nested. More deeply nested structures will be left alone (which will only work under very special conditions and yield ugly error messages otherwise). """ loopify = getLoopifier(field) if loopify is None: return [] if (field.xtype=="adql:TIMESTAMP" # legacy, delete ~ 2024 or field.xtype=="timestamp"): return loopify([ "if isinstance(val, datetime.datetime):", " val = utils.formatISODT(val)"]) if field.xtype=="timestamp-interval": # local addition return loopify([ "if isinstance(val, datetime.datetime):", " val = utils.formatISODT(val)"]) elif field.xtype=="dachs:DATE": return loopify([ "if isinstance(val, datetime.date):", " val = val.isoformat()"]) elif field.xtype in ["adql:POINT", "adql:REGION"]: return loopify([ "if isinstance(val, pgsphere.PgSAdapter):", " val = val.asSTCS('UNKNOWNFrame')"]) elif field.xtype in ["point", "circle", "polygon", "moc", "x-box"]: return loopify([ "if isinstance(val, pgsphere.PgSAdapter):", " val = val.asDALI()"]) else: return []
[docs]def getXtypeDecoderCode(field): """returns code that turns generic VOTable arrays into special internal representations based on xtype. This returns a list of lines or an empty list if no known xtype is found. The code is executed with the unpacked array seen as val, and it should set val to the special representation. This will handle 1D arrays of xtyped things but nothing more deeply nested. More deeply nested structures will be left alone (which is ok for round-tripping but probably will fail when DaCHS components want to process stuff). """ if not field.xtype: return [] loopify = getLoopifier(field) if loopify is None: return [] src = [ # the val.strip() is a workaround for a TOPCAT bug that would sometimes # turn empty strings into single blanks. "if not val or (isinstance(val, str) and not val.strip()):", " val = None", "else:"] if field.xtype=="adql:POINT": src.extend([ " val = stc.parseSimpleSTCS(val)"]) elif field.xtype=="adql:REGION": src.extend([ " val = stc.simpleSTCSToPolygon(val)"]) elif field.xtype=="point": src.extend([ " val = pgsphere.SPoint.fromDALI(val)"]) elif field.xtype=="circle": src.extend([ " val = pgsphere.SCircle.fromDALI(val)"]) elif field.xtype=="polygon": src.extend([ " val = pgsphere.SPoly.fromDALI(val)"]) elif field.xtype=="moc": src.extend([ " val = pgsphere.SMoc.fromDALI(val)"]) elif field.xtype=="x-box": src.extend([ " val = pgsphere.SBox.fromDALI(val)"]) elif (field.xtype=="adql:TIMESTAMP" or field.xtype=="timestamp" or field.xtype=="timestamp-interval"): src.extend([ " val = parseDefaultDatetime(val)"]) # GAVO-specific extension for consistency in our type systems elif field.xtype=="dachs:DATE": src.extend([ " val = parseDefaultDate(val)"]) else: # unknown xtype; ignore it and process stuff as usual return [] return loopify(src)
[docs]class NULLFlags(object): """an interface to the BINARY2 NULL flags. Construct it with the number of fields, then use """ masks = [0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01] def __init__(self, nFields): self.nFields = nFields self.nBytes = (self.nFields+7)//8
[docs] def serialize(self, nullMap): """returns null bytes for nullMap, which is a sequence of booleans with Trues where the field is NULL. It is an error to pass in nullMaps with lengths!=nFields. """ assert len(nullMap)==self.nFields mapBytes, curBits, val = [], 0, 0 for isNull in nullMap: if isNull: val = (val<<1)+1 else: val <<= 1 curBits += 1 if curBits==8: mapBytes.append(val) curBits, val = 0, 0 if curBits: val <<= (8-curBits) mapBytes.append(val) return bytes(mapBytes)
[docs] def serializeFromRow(self, row): """returns null bytes for a row, which is a sequence of values. Everything that's None is flagged as NULL. """ return self.serialize([v is None for v in row])
[docs] def deserialize(self, toDecode): """returns a sequence of booleans giving for each element in a row if there's a NULL there. """ nulls = [] for byte in toDecode: for mask in self.masks: if mask&byte: nulls.append(True) else: nulls.append(False) if len(nulls)==self.nFields: break return nulls
[docs] def getFromFile(self, file): """returns a sequence of booleans giving for each element in a row if there's a NULL there. """ return self.deserialize(file.read(self.nBytes))
[docs]def isMultiDim(arraysize): """returns True if the VOTable arraysize denotes a >1D-array. """ return arraysize is not None and "x" in arraysize
[docs]def hasVarLength(arraysize): """returns True if the VOTable arraysize denotes a variable-length array. This is, of course, False for None arraysizes, """ return arraysize and arraysize.endswith("*")
[docs]def getLength(arraysize): """returns the number of elements expected for an array described with the VOTable attribute arraysize. A 1-element array isn't told apart from a scalar here. Both return 1. For variable-length arrays, this returns None. Bad arraysize specs will give ValueErrors (perhaps not always with the most helpful messages). >>> getLength(None) 1 >>> getLength("*") >>> getLength("5") 5 >>> getLength("5x*") >>> getLength("5x6*") >>> getLength("7x5x6") 210 >>> getLength("7*x5x6") Traceback (most recent call last): ValueError: invalid literal for int() with base 10: '7*' """ if arraysize is None: return 1 if arraysize.endswith("*"): return None elif isMultiDim(arraysize): return functools.reduce( lambda a, b: a*b, map(int, arraysize.split("x"))) else: try: return int(arraysize) except ValueError: # fall through to exception at function exit pass raise ValueError("Invalid arraysize specification: %s"%arraysize)
[docs]def getShape(datatype, arraysize): """returns a numpy-compatible shape for a VOTable arraysize. For variable length 1D arrays, this returns None; for 2+D arrays, the last dimension is currently replaced by 1. Which doesn't sound smart. """ if arraysize is None: return None if datatype=="char" and not "x" in arraysize: # special case: 1d char arrays are just scalar strings return None if arraysize=="*": return None # What should we really return here? val = arraysize.replace("*", "") if "x" in val: if val.endswith("x"): # variable last dimension val = val+'1' return tuple(int(d) for d in val.split("x")) else: return (int(val),)
if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()