Source code for gavo.formats.votableread

"""
Parsing and translating VOTables to internal data structures.

This is glue code to the more generic votable library.  In general, you
should access this module through formats.votable.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import gzip

from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.grammars import votablegrammar
from gavo.votable import V
from gavo.votable import modelgroups

MS = base.makeStruct

[docs]class QuotedNameMaker(object): """A name maker for makeTableDefForVOTable implementing TAP's requirements. """ def __init__(self): self.index, self.seenNames = 0, set()
[docs] def makeName(self, field): self.index += 1 res = getattr(field, "name", None) if res is None: raise base.ValidationError("Field without name in upload.", "UPLOAD") if res in self.seenNames: raise base.ValidationError("Duplicate column name illegal in" " uploaded tables (%s)"%res, "UPLOAD") self.seenNames.add(res) return utils.QuotedName(res)
_PG_RESERVED_COLUMN_NAMES = set([ "oid", "tableoid", "xmin", "cmin", "xmax", "cmax", "ctid"]) class _ChangedName(str): """a sentinel class to tell upstream that a name has been changed in a way that must be reflected in a query. """
[docs]class AutoQuotedNameMaker(object): """A name maker for makeTableDefForVOTable quoting names as necessary. This is for PostgreSQL; it will also avoid PG's reserved column names (oid and friends); hence, this is what you should be using to put VOTables into postgres tables. This will break on duplicate names right now. I expect I'll change that behaviour to just renaming away name clashes at some point. """ def __init__(self, forRowmaker=False): self.seenNames = set()
[docs] def makeName(self, field): name = getattr(field, "name", None) if name is None: raise base.ValidationError("Field without name in upload.", "UPLOAD") if valuemappers.needsQuoting(name): if name in self.seenNames: raise base.ValidationError("Duplicate column name illegal in" " uploaded tables (%s)"%name, "UPLOAD") self.seenNames.add(name) return utils.QuotedName(name) elif name.lower() in _PG_RESERVED_COLUMN_NAMES: name = name.lower()+"_" while name in self.seenNames: name = name+"_" self.seenNames.add(name) return _ChangedName(name) else: if name.lower() in self.seenNames: raise base.ValidationError("Duplicate column name illegal in" " uploaded tables (%s)"%name, "UPLOAD") self.seenNames.add(name.lower()) return name
def _getValuesFromField(votField): """returns None or an rscdef.Values instance for whatever is given in votField. """ valArgs = {} for valSpec in votField.iterChildrenOfType(V.VALUES): if valSpec.null is not None: valArgs["nullLiteral"] = valSpec.null for minSpec in valSpec.iterChildrenOfType(V.MIN): valArgs["min"] = minSpec.value for maxSpec in valSpec.iterChildrenOfType(V.MAX): valArgs["max"] = maxSpec.value options = [] for optSpec in valSpec.iterChildrenOfType(V.OPTION): # We don't support nested options in rscdef. consArgs = {"content_": optSpec.value} if optSpec.name: consArgs["title"] = optSpec.name options.append(base.makeStruct(rscdef.Option, **consArgs)) if options: valArgs["options"] = options if valArgs: return base.makeStruct(rscdef.Values, **valArgs) def _getColArgs(votInstance, name): """returns constructor arguments for an RD column or param from a VOTable FIELD or PARAM. """ kwargs = {"name": name, "tablehead": name.capitalize(), "id": getattr(votInstance, "ID", None), "type": base.voTableToSQLType( votInstance.datatype, votInstance.arraysize, votInstance.xtype)} for attName in ["ucd", "unit", "xtype"]: if getattr(votInstance, attName, None) is not None: kwargs[attName] = getattr(votInstance, attName) if getattr(votInstance, "value", None) is not None: kwargs["content_"] = votInstance.value values = _getValuesFromField(votInstance) if values: kwargs["values"] = values for desc in votInstance.iterChildrenOfType(V.DESCRIPTION): kwargs["description"] = desc.text_ return kwargs
[docs]def makeTableDefForVOTable(tableId, votTable, nameMaker=None, rd=None, **moreArgs): """returns a TableDef for a Table element parsed from a VOTable. Pass additional constructor arguments for the table in moreArgs. stcColumns is a dictionary mapping IDs within the source VOTable to pairs of stc and utype. nameMaker is an optional argument; if given, it must be an object having a makeName(field) -> string or utils.QuotedName method. It must return unique objects from VOTable fields and do that reproducibly, i.e., for a given field the same name is returned. The default is valuemappers.VOTNameMaker. When building TDs for Postgres, use AutoQuotedNameMaker to generate valid column names. As an extra service, in particular for ADQL name resolving, the column objects returned here have an attribute originalName containing whatever was originally in a FIELD's @name. If unique "main" positions are given, a spatial q3c index will be added. """ if nameMaker is None: nameMaker = valuemappers.VOTNameMaker() # make columns columns = [] for f in votTable.iterChildrenOfType(V.FIELD): newName = nameMaker.makeName(f) columns.append(MS(rscdef.Column, **_getColArgs(f, newName))) # tell the ADQL machinery if we've significantly modified the # name (i.e., more than just quoting). if isinstance(newName, _ChangedName): columns[-1].originalName = f.name # make params params = [] for f in votTable.iterChildrenOfType(V.PARAM): try: params.append(MS(rscdef.Param, **_getColArgs(f, f.name))) except Exception as ex: # never die because of failing params base.ui.notifyError("Unsupported PARAM ignored (%s)"%ex) # Create the table definition tableDef = MS(rscdef.TableDef, id=tableId, columns=columns, params=params, parent_=rd, **moreArgs) # Build STC info for colInfo, ast in modelgroups.unmarshal_STC(votTable): for colId, utype in colInfo.items(): try: col = tableDef.getColumnById(colId) col.stcUtype = utype col.stc = ast except utils.NotFoundError: # ignore broken STC pass return tableDef
[docs]def makeDDForVOTable(tableId, vot, gunzip=False, rd=None, **moreArgs): """returns a DD suitable for uploadVOTable. moreArgs are additional keywords for the construction of the target table. Only the first resource will be turned into a DD. Currently, only the first table is used. This probably has to change. """ tableDefs = [] for res in vot.iterChildrenOfType(V.RESOURCE): for table in res.iterChildrenOfType(V.TABLE): tableDefs.append( makeTableDefForVOTable(tableId, table, rd=rd, **moreArgs)) break break if tableDefs: makes = [MS(rscdef.Make, table=tableDefs[0])] else: makes = [] return MS(rscdef.DataDescriptor, grammar=MS(votablegrammar.VOTableGrammar, gunzip=gunzip), makes=makes)
def _getRowMaker(table): """returns a function turning a VOTable tuple to a database row for table. This is mainly just building a row dictionary, except we also parse xtyped columns. """ from gavo.base.literals import parseDefaultDatetime #noflake: code gen from gavo.stc import parseSimpleSTCS, simpleSTCSToPolygon #noflake: code gen parts = [] for colInd, col in enumerate(table.tableDef): valCode = "row[%d]"%colInd parts.append("%s: %s"%(repr(col.key), valCode)) return utils.compileFunction( "def makeRow(row):\n return {%s}"%(", ".join(parts)), "makeRow", locals())
[docs]def uploadVOTable(tableId, srcFile, connection, gunzip=False, rd=None, **tableArgs): """creates a temporary table with tableId containing the first table in the VOTable in srcFile. The function returns a DBTable instance for the new file. srcFile must be an open file object (or some similar object). """ if gunzip: srcFile = gzip.GzipFile(fileobj=srcFile, mode="r") try: tuples = next(votable.parse(srcFile, raiseOnInvalid=False)) except StopIteration: # no table contained; bomb out raise ValueError("Cannot parse VOTable (or no table contained)") args = {"onDisk": True, "temporary": True} args.update(tableArgs) td = makeTableDefForVOTable(tableId, tuples.tableDefinition, rd=rd, **args) table = rsc.TableForDef(td, connection=connection, create=True) makeRow = _getRowMaker(table) with table.getFeeder() as feeder: for tuple in tuples: feeder.add(makeRow(tuple)) return table