Source code for gavo.formats.votablewrite

"""
Generating VOTables from internal data representations.

This is glue code to the more generic GAVO votable library.  In particular,
it governs the application of base.SerManagers and their column descriptions
(which are what is passed around as colDescs in this module to come up with
VOTable FIELDs and the corresponding values.

You should access this module through formats.votable.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import functools
import io
import itertools

from gavo import base
from gavo import dm
from gavo import rsc
from gavo import stc
from gavo import utils
from gavo import votable
from gavo.base import meta
from gavo.base import valuemappers
from gavo.formats import common
from gavo.votable import V
from gavo.votable import modelgroups


[docs]class Error(base.Error): pass
tableEncoders = { "td": V.TABLEDATA, "binary": V.BINARY, "binary2": V.BINARY2, }
[docs]class VOTableContext(utils.IdManagerMixin): """A context object for writing VOTables. The constructor arguments work as keyword arguments to ``getAsVOTable``. Some other high-level functions accept finished contexts. This class provides management for unique ID attributes, the value mapper registry, and possibly additional services for writing VOTables. VOTableContexts optionally take - a value mapper registry (by default, valuemappers.defaultMFRegistry) - the tablecoding (currently, td, binary, or binary2) - version=(1,1) to order a 1.1-version VOTable, (1,2) for 1.2. (default is now 1.4). - acquireSamples=False to suppress reading some rows to get samples for each column - suppressNamespace=False to leave out a namespace declaration (mostly convenient for debugging) - overflowElement (see votable.tablewriter.OverflowElement) There's also an attribute produceVODML that will automatically be set for VOTable 1.5; you can set it to true manually, but the resulting VOTables will probably be invalid. If VO-DML processing is enabled, the context also manages models declared; that's the modelsUsed dictionary, mapping prefix -> dm.Model instances """ def __init__(self, mfRegistry=valuemappers.defaultMFRegistry, tablecoding='binary', version=None, acquireSamples=True, suppressNamespace=False, overflowElement=None): self.mfRegistry = mfRegistry self.tablecoding = tablecoding self.version = version or (1,4) self.acquireSamples = acquireSamples self.suppressNamespace = suppressNamespace self.overflowElement = overflowElement self._containerStack = [] self._tableStack = [] self._dmAttrStack = [] self._pushedRefs = {} # state for VO-DML serialisation self.produceVODML = self.version[0]>1 or self.version[1]>4 # group-serialising annotations must enter their python ids when they # end up in the tree so they get only included once when # referenced by multiple other annotations. self.groupIdsInTree = set() self.modelsDeclared = set() self.rootVODML = V.VODML() self.vodmlTemplates = V.TEMPLATES() self.rootVODML[self.vodmlTemplates] # While we're producing both note-style STC1 utypes and ad-hoc # COOSYS refs from <stc>, make them share COOSYS elements based # on the id of the underlying STC ast. Remove this when we've # dumped STC1 utypes. self.coosysByAST = {}
[docs] def addVODMLPrefix(self, prefix): """arranges the DM with prefix to be included in modelsUsed. """ if prefix not in self.modelsDeclared: self.rootVODML[dm.getModelForPrefix(prefix).getVOT(self, None)] self.modelsDeclared.add(prefix)
[docs] def addVODMLMaterial(self, stuff): """adds VODML annotation to this VOTable. Note that it will only be rendered if produceVODML is true (in general, for target versions >1.4). """ self.vodmlTemplates[stuff]
[docs] def makeTable(self, table): """returns xmlstan for a table. This is exposed as a method of context as the dm subpackage needs it, but I don't want to import formats there (yet). This may go away as I fix the interdependence of dm, votable, and format. """ return makeTable(self, table)
[docs] def getEnclosingTable(self): """returns the xmlstan element of the table currently built. This returns a ValueError if the context isn't aware of a table being built. (This depends builders using activeContainer) """ for el in reversed(self._containerStack): if el.name_=="TABLE": return el raise ValueError("Not currently building a table.")
[docs] def getEnclosingResource(self): """returns the xmlstan element of the resource currently built. This returns a ValueError if the context isn't aware of a resource being built. (This depends builders using activeContainer) """ for el in reversed(self._containerStack): if el.name_=="RESOURCE": return el raise ValueError("Not currently building a table.")
[docs] def getEnclosingContainer(self): """returns the innermost container element the builders have declared. """ return self._containerStack[-1]
@property def currentTable(self): """the DaCHS table object from which things are currently built. If no builder has declared a table being built (using buildingFromTable), it's a value error. """ if not self._tableStack: raise ValueError("No table being processed.") return self._tableStack[-1]
[docs] @contextlib.contextmanager def activeContainer(self, container): """a context manager to be called by VOTable builders when they open a new TABLE or RESOURCE. """ self._containerStack.append(container) try: yield finally: self._containerStack.pop()
[docs] @contextlib.contextmanager def buildingFromTable(self, table): """a context manager to control code that works on a DaCHS table. """ self._tableStack.append(table) try: yield finally: self._tableStack.pop()
[docs] @contextlib.contextmanager def settingAttribute(self, dmAttr): """a context manager controlling the curDMAttr attribute. This is used by the DM annotators that sometimes need to directly manipulate the ATTRIBUTE element. """ self._dmAttrStack.append(dmAttr) try: yield dmAttr finally: self._dmAttrStack.pop()
@property def curDMAttr(self): """returns the V.ATTRIBUTE element currently worked on. This will raise an IndexError if not ATTRIBUTE is being built at the moment. """ return self._dmAttrStack[-1]
[docs] def pushRefFor(self, rdEl, refVal): """orders refVal to be set as ref on rdEl's VOTable representation if such a thing is being serialised. This currently is more a hack for PARAMs with COOSYS than something that should be really used; if this were to become a general pattern, we should work out a way to assign the ref if rdEl's representation already is in the tree... """ self._pushedRefs[id(rdEl)] = refVal
[docs] def addID(self, rdEl, votEl): """adds an ID attribute to votEl if rdEl has an id managed by self. Also, if a ref has been noted for rdEl, a ref attribute is being added, too. This is a special hack for params and coosys; and I suspect we shouldn't go beyond that. """ try: votEl.ID = self.getIdFor(rdEl) except base.NotFoundError: # the param is not referenced and thus needs no ID pass if id(rdEl) in self._pushedRefs: votEl.ref = self._pushedRefs[id(rdEl)] return votEl
################# Turning simple metadata into VOTable elements. def _iterInfoInfos(dataSet): """returns a sequence of V.INFO items from the info meta of dataSet. """ for infoItem in dataSet.getMeta("info", default=[]): name, value, id = infoItem.infoName, infoItem.infoValue, infoItem.infoId yield V.INFO(name=name, value=value, ID=id)[infoItem.getContent()] def _iterWarningInfos(dataSet): """yields INFO items containing warnings from the tables in dataSet. """ for table in list(dataSet.tables.values()): for warning in table.getMeta("_warning", propagate=False, default=[]): yield V.INFO(name="warning", value="In table %s: %s"%( table.tableDef.id, warning.getContent("text", macroPackage=table))) def _iterDatalinkResources(ctx, dataSet): """yields RESOURCE elements for datalink services defined for tables we have. This needs to be called before the tables are serialised because we put ids on fields. """ for table in list(dataSet.tables.values()): for svcMeta in table.iterMeta("_associatedDatalinkService"): try: service = base.resolveId(table.tableDef.rd, base.getMetaText(svcMeta, "serviceId")) # datalink cannot be globally imported, as we can't depend on # anything in protocols. Importing it here is ok, though; # we'll just not produce datalink declarations if the server # can't do it. from gavo.protocols import datalink yield datalink.makeDatalinkServiceDescriptor( ctx, service, table.tableDef, base.getMetaText(svcMeta, "idColumn")) # Temporary hack: for now, if something wants to have immediate # soda resources on results (so far, only SSAP does), they add # generating functions to data.sodaGenerators. This needs # to be replaced with something better when we understand what # we actually want. while getattr(dataSet, "sodaGenerators", []): yield dataSet.sodaGenerators.pop()(ctx) except Exception as ex: base.ui.notifyWarning("RD %s: request for datalink service" " could not be satisfied (%s)"%( getattr(table.tableDef.rd, "sourceId", "<internal>"), ex)) def _iterUniqueInfosFromMeta( ctx, metaCarriers, metaKey, infoName, ucd, infoText): """yields standard INFOs from meta items on a sequence of metaCarriers. metaKey references the meta items to make INFOs from. InfoName and ucd are set on the INFO elements, infoText will be formatted into the body text, where this functions locals are template arguments. That includes value as the INFO's value. For when the UCD depends on the INFO's value, ucd can also be a callable, which receives the value and must return the UCD. The function makes sure that no item is returned more than once. """ seenItems = set() if ctx.version<(1,2): # No UCD on INFO for ancient VOTables ucd = None for mc in metaCarriers: for metaValue in mc.iterMeta(metaKey, propagate=True): value = metaValue.getContent("text", macroPackage=mc) if value in seenItems: continue seenItems.add(value) if callable(ucd): thisUCD = ucd(value) else: thisUCD = ucd yield V.INFO(name=infoName, value=value, ucd=thisUCD)[ infoText.format(**locals())] def _iterResourceMeta(ctx, dataSet): """adds Data Origin metadata to the RESOURCE parent. At this point, we don't know the creating service any more, and depending on the sort of thing we are doing here, that's where we should take the metadata from. We therefore inspect the contributingMetaCarriers attribute on the data item coming in and integrate the metadata from everything that's in there (usually a service, but in the TAP case perhaps also joined tables). If this kind of things becomes more common, we should think of a less hacky way to smuggle extra meta sources in here. """ yield V.DESCRIPTION[base.getMetaText(dataSet, "description", macroPackage=dataSet.dd.rd, propagate=False)] for el in itertools.chain( _iterInfoInfos(dataSet), _iterWarningInfos(dataSet)): yield el yield V.INFO(name="server_software", value=base.SERVER_SOFTWARE)[ "Software that produced this VOTable"] yield V.INFO(name="server", value=base.getConfig("web", "serverURL"))[ "Base URI of the server"] sources = dataSet.contributingMetaCarriers yield from _iterUniqueInfosFromMeta(ctx, sources, "howtociteLink", "citation", "", "Advice on citing this resource") yield from _iterUniqueInfosFromMeta(ctx, sources, "source", "publication_id", lambda value: "meta.bib.bibcode" if utils.couldBeABibcode(value) else "meta.bib", "A bibliographic source citable for (parts of) this data") yield from _iterUniqueInfosFromMeta(ctx, sources, "published_identifier", "ivoid", "meta.ref.ivoid", "Originating VO resource") yield from _iterUniqueInfosFromMeta(ctx, sources, "publisher", "publisher", None, "Data centre that has delivered the data") yield from _iterUniqueInfosFromMeta(ctx, sources, "rights", "rights", None, "Legal conditions applicable to (parts of) the data contained") yield from _iterUniqueInfosFromMeta(ctx, sources, "rights.rightsURI", "rights_uri", None, "URI of legal conditions applicable to (parts of) the data contained") yield V.INFO(name="request_date", ucd="time.creation", value=utils.formatISODT(datetime.datetime.utcnow())) yield from _iterUniqueInfosFromMeta(ctx, sources, "contact.email", "contact", "meta.email", "Contact option") yield from _iterUniqueInfosFromMeta(ctx, sources, "referenceURL", "reference_url", "meta.ref.url", "More information on the data Source") yield from _iterUniqueInfosFromMeta(ctx, sources, "version", "resource_version", "meta.version", "Version of a contributing resource.") yield from _iterUniqueInfosFromMeta(ctx, sources, "creator.name", "creator", "meta.bib.author", "Name of a person or entity that produced a contributing resource") def _iterToplevelMeta(ctx, dataSet): """yields meta elements for the entire VOTABLE from dataSet's RD. """ rd = dataSet.dd.rd if rd is None: return yield V.DESCRIPTION[base.getMetaText(rd, "description", macroPackage=dataSet.dd.rd)] for infoItem in rd.iterMeta("copyright"): yield V.INFO(name="legal", value=infoItem.getContent("text", macroPackage=dataSet.dd.rd)) # link elements may be defined using the votlink meta on RESOURCE, TABLE, # GROUP, FIELD, or PARAM; within in the DC, GROUPs have no meta structure, # so we don't run _linkBuilder on them. def _makeLinkForMeta(args, localattrs=None): localattrs.update({"href": args[0]}) return V.LINK(**localattrs) _linkBuilder = meta.ModelBasedBuilder([ ('votlink', _makeLinkForMeta, (), { "href": "href", "content_role": "role", "content_type": "contentType", "name": "linkname",})]) ################# Generating FIELD and PARAM elements. def _makeValuesForColDesc(colDesc): """returns a VALUES element for a column description. This just stringifies whatever is in colDesc's respective columns, so for anything fancy pass in byte strings to begin with. """ valEl = V.VALUES() if colDesc.get("min") is None: colDesc["min"] = getattr(colDesc.original.values, "min", None) if colDesc.get("max") is None: colDesc["max"] = getattr(colDesc.original.values, "max", None) if colDesc["max"] is utils.Infimum: colDesc["max"] = None if colDesc["min"] is utils.Supremum: colDesc["min"] = None if colDesc["min"] is not None: valEl[V.MIN(value=str(colDesc["min"]))] if colDesc["max"] is not None: valEl[V.MAX(value=str(colDesc["max"]))] if colDesc["nullvalue"] is not None: valEl(null=colDesc["nullvalue"]) for option in getattr(colDesc.original.values, "options", []): valEl[V.OPTION(value=option.content_ or "", name=option.title)] return valEl # keys copied from colDescs to FIELDs in _getFieldFor _voFieldCopyKeys = ["name", "datatype", "ucd", "utype", "ref"]
[docs]def defineField(ctx, element, colDesc): """adds attributes and children to element from colDesc. element can be a V.FIELD or a V.PARAM *instance* and is changed in place. This function returns None to remind people we're changing in place here. """ # bomb if you got an Element rather than an instance -- with an # Element, things would appear to work, but changes are lost when # this function ends. assert not isinstance(element, type), ("Got FIELD/PARAM element" " instead of instance in VOTable defineField") if colDesc["arraysize"]!='1': element(arraysize=colDesc["arraysize"]) # (for char, keep arraysize='1' to keep topcat happy) if colDesc["datatype"]=='char' and colDesc["arraysize"]=='1': element(arraysize='1') if colDesc["unit"]: element(unit=colDesc["unit"]) element(ID=colDesc["id"]) # don't include xtype if writing 1.1 xtype = colDesc.get("xtype") if ctx.version>(1,1): element(xtype=xtype) if isinstance(element, V.PARAM): if hasattr(colDesc.original, "getStringValue"): try: element(value=str(colDesc.original.getStringValue())) except: # there's too much that can legitimately go wrong here to bother: pass if colDesc.original: rscCol = colDesc.original if rscCol.hasProperty("targetType"): element[V.LINK( content_type=rscCol.getProperty("targetType"), title=rscCol.getProperty("targetTitle", "Link"))] element(**dict((key, colDesc.get(key) or None) for key in _voFieldCopyKeys))[ V.DESCRIPTION[colDesc["description"]], _makeValuesForColDesc(colDesc), _linkBuilder.build(colDesc.original) ]
[docs]def makeFieldFromColumn(ctx, colType, rscCol): """returns a VOTable colType for a rscdef column-type thing. This function lets you make PARAM and FIELD elements (colType) from column or param instances. """ instance = colType() defineField(ctx, instance, valuemappers.AnnotatedColumn(rscCol)) return instance
def _iterFields(ctx, serManager): """iterates over V.FIELDs based on serManger's columns. """ for colDesc in serManager: el = V.FIELD() defineField(ctx, el, colDesc) yield el def _makeVOTParam(ctx, param): """returns VOTable stan for param. """ # note that we're usually accessing the content, i.e., the string # serialization we got. The only exception is when we're seeing # nulls or null-equivalents. if param.content_ is base.NotGiven or param.value is None: content = None else: content = param.content_ el = V.PARAM() defineField(ctx, el, valuemappers.AnnotatedColumn(param)) # id management is a particular pain for params at this moment, # because DM annotation uses the tableDef's params, whereas # we see the copied params here. We should stop copying # the params in rsc.common.ParamMixin. try: el.ID = ctx.getIdFor(param.originalParam) except (base.NotFoundError, AttributeError): try: el.ID = ctx.getIdFor(param) except base.NotFoundError: pass if content is None: el.value = "" else: el.value = content return el def _iterTableParams(ctx, serManager): """iterates over V.PARAMs based on the table's param elements. """ for param in serManager.table.iterParams(): votEl = _makeVOTParam(ctx, param) if votEl is not None: ctx.addID(param, votEl) yield votEl ####################### Tables and Resources # an ad-hoc mapping of what STC1 has in frame to what VOTable 1.1 has for # COOSYS/@system STC_FRAMES_TO_COOSYS = { 'ICRS': 'ICRS', 'FK5': 'eq_FK5', 'FK4': 'eq_FK4', 'ECLIPTIC': 'ecl_FK5', # neglecting the odds there's actually ecl_FK4 data 'GALACTIC_II': 'galactic', 'SUPERGALACTIC': 'supergalactic', None: None} # these are utypes for which column refs should be ref-ed to the COOSYS. COLUMN_REF_UTYPES = [ "stc:astrocoords.position2d.value2.c1", "stc:astrocoords.position2d.value2.c2", "stc:astrocoords.position2d.error2.c1", "stc:astrocoords.position2d.error2.c2", "stc:astrocoords.velocity2d.value2.c1", "stc:astrocoords.velocity2d.value2.c2", "stc:astrocoords.velocity2d.error2.c1", "stc:astrocoords.velocity2d.error2.c2", "stc:astrocoords.redshift.value", "stc:astrocoords.position3d.value3.c1", "stc:astrocoords.position3d.value3.c2", "stc:astrocoords.position3d.value3.c3", "stc:astrocoords.time.timeinstant", "stc:astrocoords.velocity3d.value3.c1", "stc:astrocoords.velocity3d.value3.c2", "stc:astrocoords.velocity3d.value3.c3", "stc:astrocoords.position2d.epoch", "stc:astrocoords.position3d.epoch", ] def _addEpochToCoosys(coosys, utypeMap): """tries to locate epoch information utypeMap and adds it to the V.COOSYS instance coosys if found. """ for epochBase in [ "stc:astrocoords.position2d.", "stc:astrocoords.position3d."]: if epochBase+"epoch" in utypeMap: break else: return epVal = utypeMap[epochBase+"epoch"] if not isinstance(epVal, stc.ColRef): # If the epoch is not a column reference, inline it. # (the code below will ignore it in that case) coosys(epoch="%s%s"%( utypeMap.get(epochBase+"epoch.yeardef", "J"), epVal)) def _makeCOOSYSFromSTC1(utypeMap, serManager): """returns a VOTable 1.1 COOSYS element inferred from a map of stc utypes to STC1 values. We let through coordinate frames not defined in VOTable 1.1 at the expense of making the VOTables XSD-invalid with such frames; this seems preferable to not declaring anything. As a side effect, this will change column/@ref attributes (possibly also param/@ref). If a column is part of two STC structures, the first one will win. Yeah, that spec sucks. """ coosys = V.COOSYS() sysId = serManager.makeIdFor(coosys, "system") coosys(ID=sysId) _addEpochToCoosys(coosys, utypeMap) stcFrame = utypeMap.get( "stc:astrocoordsystem.spaceframe.coordrefframe", None) coosys(system=STC_FRAMES_TO_COOSYS.get(stcFrame, stcFrame)) if "references-from-coosys" in base.getConfig("future"): coosys(refposition=utypeMap.get( "stc:astrocoordsystem.spaceframe.referenceposition", None)) for utype in COLUMN_REF_UTYPES: if utype in utypeMap: val = utypeMap[utype] if isinstance(val, stc.ColRef): col = serManager.getColumnByName(str(val)) if not col.get("ref"): col["ref"] = sysId return coosys def _iterSTC(ctx, tableDef, serManager): """adds STC groups for the systems to votTable fetching data from tableDef. """ def getColumnFor(colRef): try: return serManager.getColumnByName(colRef.dest) except KeyError: # in ADQL processing, names are lower-cased, and there's not # terribly much we can do about it without breaking other things. # Hence, let's try and see whether our target is there with # case normalization: return serManager.getColumnByName(colRef.dest.lower()) def getIdFor(colRef): return getColumnFor(colRef)["id"] for ast in tableDef.getSTCDefs(): container, utypeMap = modelgroups.marshal_STC(ast, getIdFor) if ctx.version>(1,1): # "Note-style" STC only supported in 1.2 and higher yield container # legacy COOSYS specification supported everywhere coosysEl = _makeCOOSYSFromSTC1(utypeMap, serManager) ctx.coosysByAST[str(id(ast))] = coosysEl ctx.getEnclosingResource()[coosysEl] def _addStupidRefByAnnotation(ctx, annotation, serManager, destId): """adds a @ref attribute to destId to a PARAM or FIELD that annotation references. This is for legacy TIMESYS or COOSYS, and the born-legacy PHOTCAL group. Sigh. """ if isinstance(annotation, dm.ColumnAnnotation): col = serManager.getColumnByName(annotation.value.name) if not col.get("ref"): col["ref"] = destId elif isinstance(annotation, dm.ParamAnnotation): ctx.pushRefFor( serManager.table.getParamByName(annotation.value.name), destId) # mapping from our votable-stc role names to COOSYS utypes _STC_UTYPE_MAPPING = { "longitude": "votable:LonLatPoint-lon", "latitude": "votable:LonLatPoint-lat", "pm_longitude": "votable:ProperMotion-lon", "pm_latitude": "votable:ProperMotion-lat", "distance": "votable:LonLatPoint-dist", "rv": "votable:ProperMotion-rv", } if "references-from-coosys" in base.getConfig("future"): def _makeLegacyRefForAnn(ctx, annotation, utype): """returns a PARAMref or FIELDref for annotation with the utype passed. The target of annotation will receive an id in ctx. If the """ valId = ctx.getOrMakeIdFor( annotation.value, suggestion=annotation.value.name) if isinstance(annotation, dm.ColumnAnnotation): return V.FIELDref(utype=utype, ref=valId) elif isinstance(annotation, dm.ParamAnnotation): return V.PARAMref(utype=utype, ref=valId) else: base.ui.notifyWarning(f"Unknown annotation {annotation}") # fall through to returning None, which the serialiser will swallow else: def _makeLegacyRefForAnn(ctx, annotation, utype): return None def _guessTimeOrigin(frame, timeCol): """returns a plausible value for a TIMESYS' timeorigin attribute from a frame definition and a time column. This is a depressing mess; thanks, DM WG. """ if "time0" in frame: # that's the only sane option return frame.get("time0") if not isinstance(timeCol, dm.ParamLikeAnnotation): # it's a literal and we have no further metadata. return timeCol = timeCol.weakref() # let's guess time origin based on units and such. if timeCol.unit=="d": # assuming JD or MJD. Since people didn't give a time0, try to guess. if "mjd" in timeCol.name: return "MJD-origin" # JD is a silly default, but what can I do? SIGH! return "JD-origin" if timeCol.unit=="s": # if that's really not a unix timestamp, people need to # give a time0. return "2440587.5" # we ought to assert unit="yr" here return None def _addLegacySYSFromVOTableSTC(ctx, tableDef, serManager): """adds COOSYS and TIMESYS elements from votable-stc annotation to the enclosing RESOURCE. """ for ann in tableDef.iterAnnotationsOfType("votable:Coords"): if "time" in ann: timeFrame = ann["time"].get("frame", {}) timesysEl = V.TIMESYS( timescale=timeFrame.get("timescale", "UNKNOWN"), refposition=timeFrame.get("refPosition", "UNKNOWN"), timeorigin=_guessTimeOrigin(timeFrame, ann["time"].get("location"))) timesysId = serManager.makeIdFor(timesysEl, "ts") timesysEl(ID=timesysId) ctx.getEnclosingResource()[ timesysEl] if "location" in ann["time"]: _addStupidRefByAnnotation( ctx, ann["time"]["location"], serManager, timesysId) if "space" in ann: spaceFrame = ann["space"]["frame"] coosys = None if "astid" in ann: # the annotation was built from an <stc> declaration, and # _iterSTC may already have created the coosys element, in # which case we want to re-use it. coosys = ctx.coosysByAST.get(str(ann["astid"])) if coosys is None: # no such coosys created yet coosys = V.COOSYS() ctx.getEnclosingResource()[coosys] sysId = serManager.getOrMakeIdFor(coosys, "system") # we probably should be more careful with literals vs. references # for the orientation, too coosys(ID=sysId, system=STC_FRAMES_TO_COOSYS[spaceFrame.get("orientation")], refposition=spaceFrame.get("refPosition")) epVal = spaceFrame.get("epoch") if isinstance(epVal, str): coosys(epoch=epVal) elif epVal: coosys[_makeLegacyRefForAnn( ctx, epVal, "votable:CustomRefLocation-epoch")] for child in ann["space"].iterChildRoles(): _addStupidRefByAnnotation(ctx, child, serManager, sysId) if (child.name in _STC_UTYPE_MAPPING and isinstance(child, dm.ParamLikeAnnotation)): coosys[_makeLegacyRefForAnn( ctx, child, _STC_UTYPE_MAPPING[child.name])] _PHOT_CAL_PARAMS = { "filterIdentifier": dict(name="filterIdentifier", datatype="char", ucd="meta.id;instr.filter", arraysize="*", utype="photDM:PhotometryFilter.identifier"), "zeroPointFlux": dict(name="zeroPointFlux", ucd="phot.mag;arith.zp", utype="photDM:PhotCal.zeroPoint.flux.value", unit="Jy", datatype="double"), "magnitudeSystem": dict(name="magnitudeSystem", ucd="meta.code", utype="photDM:PhotCal.magnitudeSystem.type", datatype="char", arraysize="*"), "effectiveWavelength": dict(name="effectiveWavelength", ucd="em.wl.effective", unit="m", datatype="double", utype="photDM:PhotometryFilter.spectralLocation.value"), } def _addLegacyPhotCal(ctx, tableDef, serManager): """adds PhotCal groups to the enclosing RESOURCE. """ for ann in tableDef.iterAnnotationsOfType("phot:PhotCal"): photGroup = V.GROUP(name="photcal") photGroup(ID=serManager.makeIdFor(photGroup, "phot_def")) for child in ann.iterChildRoles(): if child.name=="value": # DaCHS extension: reference the column/param this is for photGroup[V.FIELDref(utype="adhoc:location", ref=ctx.getOrMakeIdFor(child.value))] _addStupidRefByAnnotation(ctx, child, serManager, photGroup.ID) elif child.name in _PHOT_CAL_PARAMS: photGroup[V.PARAM(**_PHOT_CAL_PARAMS[child.name])( value=str(child.value))] else: # just ignore items we don't understand pass ctx.getEnclosingResource()[photGroup] def _iterNotes(serManager): """yields GROUPs for table notes. The idea is that the note is in the group's description, and the FIELDrefs give the columns that the note applies to. """ # add notes as a group with FIELDrefs, but don't fail on them for key, note in serManager.notes.items(): noteId = serManager.getOrMakeIdFor(note) noteGroup = V.GROUP(name="note-%s"%key, ID=noteId)[ V.DESCRIPTION[note.getContent(targetFormat="text")]] for col in serManager: if col["note"] is note: noteGroup[V.FIELDref(ref=col["id"])] yield noteGroup def _makeRef(baseType, ref, container, serManager): """returns a new node of baseType reflecting the group.TypedRef instance ref. container is the destination of the reference. For columns, that's the table definition, but for parameters, this must be the table itself rather than its definition because it's the table's params that are embedded in the VOTable. """ return baseType( ref=serManager.getOrMakeIdFor(ref.resolve(container)), utype=ref.utype, ucd=ref.ucd) def _iterGroups(ctx, container, serManager): """yields GROUPs for the RD groups within container, taking params and fields from serManager's table. container can be a tableDef or a group. """ for group in container.groups: votGroup = V.GROUP(ucd=group.ucd, utype=group.utype, name=group.name) votGroup[V.DESCRIPTION[group.description]] for ref in group.columnRefs: votGroup[_makeRef(V.FIELDref, ref, serManager.table.tableDef, serManager)] for ref in group.paramRefs: votGroup[_makeRef(V.PARAMref, ref, serManager.table, serManager)] for param in group.params: votGroup[_makeVOTParam(ctx, param)] for subgroup in _iterGroups(ctx, group, serManager): votGroup[subgroup] yield votGroup
[docs]def makeTable(ctx, table, isMeta=False): """returns a Table node for the table.Table instance table. """ sm = valuemappers.SerManager(table, mfRegistry=ctx.mfRegistry, idManager=ctx, acquireSamples=ctx.acquireSamples) # this must happen before FIELDs and such are serialised to ensure # referenced things have IDs. result = V.TABLE() with ctx.activeContainer(result): # start out with VO-DML annotation so everything that needs # an id has one. if ctx.produceVODML: for ann in table.tableDef.annotations: try: ctx.addVODMLMaterial(ann.getVOT(ctx, table)) except Exception as msg: # never fail just because stupid DM annotation doesn't work out base.ui.notifyError("%s-typed DM annotation failed: %s"%( ann.type, msg)) # iterate STC before serialising the columns so the columns # have the stupid ref to COOSYS result[_iterSTC(ctx, table.tableDef, sm)] # same for votable-STC (except here the FIELDs need reliable IDs) try: _addLegacySYSFromVOTableSTC(ctx, table.tableDef, sm) except Exception as ex: base.ui.notifyError(f"Could not serialise VOTable COOSYS/TIMESYS: {ex}") # photometry hack _addLegacyPhotCal(ctx, table.tableDef, sm) result( name=base.getMetaText(table, "name", table.tableDef.id), utype=base.getMetaText(table, "utype", macroPackage=table.tableDef, propagate=False))[ # _iterGroups must run before _iterFields and _iterParams since it # may need to add ids to the respective items. XSD-correct ordering of # the elements is done by xmlstan. V.DESCRIPTION[base.getMetaText(table, "description", macroPackage=table.tableDef, propagate=False)], _iterGroups(ctx, table.tableDef, sm), _iterFields(ctx, sm), _iterTableParams(ctx, sm), _iterNotes(sm), _linkBuilder.build(table.tableDef), ] if isMeta: # we take the "should not contain data" from the spec as "will not". # That is, we will simply drop any data on resources declared as meta return result else: return votable.DelayedTable(result, sm.getMappedTuples(), tableEncoders[ctx.tablecoding], overflowElement=ctx.overflowElement)
def _makeResource(ctx, data): """returns a Resource node for the rsc.Data instance data. """ resType = base.getMetaText(data, "_type") res = V.RESOURCE() # For now, DaCHS will only have one RESOURCE that can contain VODML # annotation. In case that ever changes, ctx will have to keep track # of what "top-level" RESOURCE we're annotating (unless MIVOT gets # sanitised). if ctx.produceVODML: res[V.RESOURCE(type="meta")[ctx.rootVODML]] with ctx.activeContainer(res): res(type=resType, utype=base.getMetaText(data, "utype"))[ _iterResourceMeta(ctx, data), [ _makeVOTParam(ctx, param) for param in data.iterParams()], _linkBuilder.build(data.dd), ] for table in data: with ctx.buildingFromTable(table): res[makeTable(ctx, table, isMeta=resType=="meta")] res[ctx.overflowElement] return res ############################# Toplevel/User-exposed code makeResource = _makeResource
[docs]def makeVOTable(data, ctx=None, **kwargs): """returns a votable.V.VOTABLE object representing data. data can be an rsc.Data or an rsc.Table. data can be a data or a table instance, tablecoding any key in votable.tableEncoders. You may pass a VOTableContext object; if you don't a context with all defaults will be used. A deprecated alternative is to directly pass VOTableContext constructor arguments as additional keyword arguments. Don't do this, though, we'll probably remove the option to do so at some point. You will usually pass the result to votable.write. The object returned contains DelayedTables, i.e., most of the content will only be realized at render time. """ ctx = ctx or VOTableContext(**kwargs) data = rsc.wrapTable(data) if ctx.version==(1,1): vot = V.VOTABLE11() elif ctx.version==(1,2): vot = V.VOTABLE12() elif ctx.version==(1,3): raise votable.VOTableError("Cannot write VOTable 1.3 any more" " (and you shouldn't have reason to).") elif ctx.version==(1,4): vot = V.VOTABLE() elif ctx.version==(1,5): vot = V.VOTABLE() else: raise votable.VOTableError("No toplevel element for VOTable version %s"% repr(ctx.version)) dlResources = list(_iterDatalinkResources(ctx, data)) vot[_iterToplevelMeta(ctx, data)] vot[_makeResource(ctx, data)] vot[dlResources] if ctx.produceVODML: vot._fixedTagMaterial += ' xmlns:mivot="{}"'.format( utils.getPrefixInfo("mivot")[0]) if ctx.suppressNamespace: # use this for "simple" table with nice element names vot._fixedTagMaterial = "" # What follows is a hack around the insanity of stuffing # unused namespaces and similar detritus into VOTable's roots. rootAttrs = data.getMeta("_votableRootAttributes") if rootAttrs: rootHacks = [vot._fixedTagMaterial]+[ item.getContent() for item in rootAttrs] vot._fixedTagMaterial = " ".join(s for s in rootHacks if s) return vot
[docs]def writeAsVOTable(data, outputFile, ctx=None, **kwargs): """writes ``data`` to the ``outputFile``. data can be a table or ``Data`` item. ``ctx`` can be a ``VOTableContext`` instance; alternatively, ``VOTableContext`` constructor arguments can be passed in as ``kwargs``. """ ctx = ctx or VOTableContext(**kwargs) vot = makeVOTable(data, ctx) votable.write(vot, outputFile)
[docs]def getAsVOTable(data, ctx=None, **kwargs): """returns a string containing a VOTable representation of data. ``kwargs`` can be constructor arguments for VOTableContext. """ ctx = ctx or VOTableContext(**kwargs) dest = io.BytesIO() writeAsVOTable(data, dest, ctx) return dest.getvalue()
[docs]def format(data, outputFile, **ctxargs): # used for construction of the formats.common interface return writeAsVOTable(data, outputFile, VOTableContext(**ctxargs))
common.registerDataWriter("votable", format, base.votableType, "Default VOTable", ".vot", tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-binary") common.registerDataWriter("votableb2", functools.partial( format, tablecoding="binary2"), "application/x-votable+xml;serialization=BINARY2", "Binary2 VOTable", ".votb2", "votable/b2", tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-binary2") common.registerDataWriter("votabletd", functools.partial( format, tablecoding="td"), "application/x-votable+xml;serialization=TABLEDATA", "Tabledata VOTable", ".vottd", "text/xml", "votable/td", tapId="ivo://ivoa.net/std/TAPRegExt#output-votable-td") common.registerDataWriter("votabletd1.1", functools.partial( format, tablecoding="td", version=(1,1)), "application/x-votable+xml;serialization=TABLEDATA;version=1.1", "Tabledata VOTable version 1.1", ".vot1", "text/xml") common.registerDataWriter("votable1.1", functools.partial( format, tablecoding="binary", version=(1,1)), "application/x-votable+xml;version=1.1", "Tabledata VOTable version 1.1", ".vot1", "text/xml") common.registerDataWriter("votabletd1.2", functools.partial( format, tablecoding="td", version=(1,2)), "application/x-votable+xml;serialization=TABLEDATA;version=1.2", "Tabledata VOTable version 1.2", ".vot2", "text/xml") common.registerDataWriter("vodml", functools.partial( format, tablecoding="td", version=(1,5)), "application/x-votable+xml;serialization=TABLEDATA;version=1.5", "VOTable version 1.5, tabledata", ".vot5") common.registerDataWriter("vodmlb", functools.partial( format, version=(1,5)), "application/x-votable+xml;version=1.5", "VOTable version 1.5", ".vot5")