Source code for

Converting ASTs to/from STC-X.

The basic idea for conversion to STC-X is that for every ASTNode in dm, there
is a serialize_<classname> function returning some xmlstan.  In general
they should handle the case when their argument is None and return None
in that case.

Traversal is done manually (i.e., by each serialize_X method) rather than
globally since the children in the AST may not have the right order to
keep XSD happy, and also since ASTs are actually a bit more complicated
than trees (e.g., coordinate frames usually have multiple parents).

#c Copyright 2008-2023, the GAVO project <>
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import itertools

from gavo import utils
from import common
from import dm
from import STC

[docs]def addId(node): """adds a synthetic id attribute to node unless it's already there. """ if not hasattr(node, "id") or is None: = utils.intToFunnyWord(id(node))
[docs]def strOrNull(val): if val is None: return None elif isinstance(val, common.ColRef): return val else: return str(val)
[docs]def isoformatOrNull(val): if val is None: return None elif isinstance(val, common.ColRef): return val else: return val.isoformat()
def _refOrStr(val): if isinstance(val, common.ColRef) or val is None: return val return str(val) def _getFromSTC(elName, itemDesc): """returns the STC element elName or raises an STCValueError if it does not exist. itemDesc is used in the error message. This is a helper for concise notation of reference frames. """ if elName is None: elName = "UNKNOWNFrame" try: return getattr(STC, elName) except AttributeError: raise common.STCValueError("No such %s: %s"%(itemDesc, elName))
[docs]class Context(object): """is a generation context. It is used to pass around genration-related information. Right now, that's primarily the root node. """ def __init__(self, rootNode): self.rootNode = rootNode
[docs] def getPosForInterval(self, node): return getattr(self.rootNode, node.cType.posAttr)
############ Coordinate Systems
[docs]def serialize_RefPos(node, context): try: return getattr(STC, node.standardOrigin or "UNKNOWNRefPos")[ STC.PlanetaryEphem[node.planetaryEphemeris]] except AttributeError: raise common.STCValueError( "No such standard origin: %s"%node.standardOrigin)
def _fudgeEquinox(eq): # Incredibly, the schema requires not more than three figures after the # comma for equinox. Sigh. if eq is None: return None res = eq[0]+"%.3f"%float(eq[1:]) # Do some cosmetics if res.endswith("00"): res = res[:-2] return res
[docs]def serialize_SpaceFrame(node, context): if node is None: return addId(node) return STC.SpaceFrame([ STC.Name[], _getFromSTC(node.refFrame, "reference frame")[ STC.Equinox[_fudgeEquinox(node.equinox)]], serialize_RefPos(node.refPos, context), _getFromSTC(node.flavor, "coordinate flavor")( coord_naxes=strOrNull(node.nDim))]
[docs]def serialize_TimeFrame(node, context): if node is None: return addId(node) return STC.TimeFrame([ STC.Name[], STC.TimeScale[node.timeScale], serialize_RefPos(node.refPos, context), ]
[docs]def serialize_SpectralFrame(node, context): if node is None: return addId(node) return STC.SpectralFrame([ STC.Name[], serialize_RefPos(node.refPos, context), ]
[docs]def serialize_RedshiftFrame(node, context): if node is None: return addId(node) return STC.RedshiftFrame(, value_type=node.type)[ STC.Name[], STC.DopplerDefinition[node.dopplerDef], serialize_RefPos(node.refPos, context), ]
[docs]def serialize_CoordSys(node, context): addId(node) if node.libraryId: return STC.AstroCoordSystem(, href=node.libraryId) else: return STC.AstroCoordSystem([ serialize_TimeFrame(node.timeFrame, context), serialize_SpaceFrame(node.spaceFrame, context), serialize_SpectralFrame(node.spectralFrame, context), serialize_RedshiftFrame(node.redshiftFrame, context),]
############ Coordinates def _wrapValues(element, valSeq, mapper=strOrNull): """returns the items of valSeq as children of element, mapped with mapper. """ if valSeq is None: return [] return [element[mapper(v)] for v in valSeq] def _serialize_Wiggle(node, serializer, wiggles): if node is None: return cooClass, radiusClass, matrixClass = wiggles if isinstance(node, dm.CooWiggle): return _wrapValues(cooClass, node.values, serializer), elif isinstance(node, dm.RadiusWiggle): return [radiusClass[strOrNull(r)] for r in node.radii] elif isinstance(node, dm.MatrixWiggle): return [matrixClass[_wrapMatrix(m, strOrNull)] for m in node.matrices] else: raise common.STCValueError("Cannot serialize %s errors to STC-X"% node.__class__.__name__) wiggleClasses = { "error": [ (STC.Error, None, None), (STC.Error2, STC.Error2Radius, STC.Error2Matrix), (STC.Error3, STC.Error3Radius, STC.Error3Matrix),], "resolution": [ (STC.Resolution, None, None), (STC.Resolution2, STC.Resolution2Radius, STC.Resolution2Matrix), (STC.Resolution3, STC.Resolution3Radius, STC.Resolution3Matrix),], "size": [ (STC.Size, None, None), (STC.Size2, STC.Size2Radius, STC.Size2Matrix), (STC.Size3, STC.Size3Radius, STC.Size3Matrix),], "pixSize": [ (STC.PixSize, None, None), (STC.PixSize2, STC.PixSize2Radius, STC.PixSize2Matrix), (STC.PixSize3, STC.PixSize3Radius, STC.PixSize3Matrix),], } def _make1DSerializer(cooClass, valueSerializer): """returns a serializer returning a coordinate cooClass. This will only work for 1-dimensional coordinates. valueSerializer is a function taking the coordinate's value and returning some xmlstan. """ def serialize(node, context): res = cooClass[ valueSerializer(node.value), _wrapValues(STC.Error, getattr(node.error, "values", ())), _wrapValues(STC.Resolution, getattr(node.resolution, "values", ())), _wrapValues(STC.PixSize, getattr(node.pixSize, "values", ())), ] if not res.shouldBeSkipped(): return res(unit=node.unit, vel_time_unit=getattr(node, "velTimeUnit", None), return serialize serialize_TimeCoo = _make1DSerializer(STC.Time, lambda value: STC.TimeInstant[STC.ISOTime[isoformatOrNull(value)]]) serialize_RedshiftCoo = _make1DSerializer(STC.Redshift, lambda value: STC.Value[strOrNull(value)]) serialize_SpectralCoo = _make1DSerializer(STC.Spectral, lambda value: STC.Value[strOrNull(value)]) _nones = (None, None, None) def _wrap1D(val, unit=_nones, timeUnit=_nones): if not val: return return _refOrStr(val[0]) def _wrap2D(val, unit=_nones, timeUnit=_nones): if not val: return if isinstance(val, common.ColRef): return val return [STC.C1(pos_unit=unit[0], vel_time_unit=timeUnit[0])[val[0]], STC.C2(pos_unit=unit[1], vel_time_unit=timeUnit[1])[val[1]]] def _wrap3D(val, unit=_nones, timeUnit=_nones): if not val: return if isinstance(val, common.ColRef): return val return [STC.C1(pos_unit=unit[0], vel_time_unit=timeUnit[0])[val[0]], STC.C2(pos_unit=unit[1], vel_time_unit=timeUnit[1])[val[1]], STC.C3(pos_unit=unit[2], vel_time_unit=timeUnit[2])[val[2]]] def _wrapMatrix(val, serializer): for rowInd, row in enumerate(val): for colInd, col in enumerate(row): yield getattr(STC, "M%d%d"%(rowInd, colInd))[serializer(col)] _spatialPosClasses = ( (STC.Position1D, STC.Value, _wrap1D), (STC.Position2D, STC.Value2, _wrap2D), (STC.Position3D, STC.Value3, _wrap3D), ) _velocityPosClasses = ( (STC.Velocity1D, STC.Value, _wrap1D), (STC.Velocity2D, STC.Value2, _wrap2D), (STC.Velocity3D, STC.Value3, _wrap3D), ) def _getSpatialUnits(node): clsArgs, cooArgs = {}, {} if node.unit: if len(set(node.unit))==1: clsArgs["unit"] = node.unit[0] elif node.unit: cooArgs["unit"] = node.unit if hasattr(node, "velTimeUnit"): if len(set(node.velTimeUnit))==1: clsArgs["vel_time_unit"] = node.velTimeUnit[0] elif node.unit: cooArgs["timeUnit"] = node.velTimeUnit return clsArgs, cooArgs def _makeSpatialCooSerializer(stcClasses): """serializes a spatial coordinate. This is quite messy since the concrete choice of elements depends on the coordinate frame. """ def serialize(node, context): if node.frame.nDim is None and node.value: dimInd = len(node.value)-1 else: dimInd = node.frame.nDim-1 coo, val, serializer = stcClasses[dimInd] clsArgs, cooArgs = _getSpatialUnits(node) valueStan = val[serializer(node.value, **cooArgs)] res = coo[ valueStan, [_serialize_Wiggle(getattr(node, wiggleType), serializer, wiggleClasses[wiggleType][dimInd]) for wiggleType in ["error", "resolution", "size", "pixSize"]], ] if node.epoch: res[STC.Epoch(yearDef=node.yearDef)[_refOrStr(node.epoch)]] if not res.shouldBeSkipped(): return res(, **clsArgs) return serialize serialize_SpaceCoo = _makeSpatialCooSerializer(_spatialPosClasses) serialize_VelocityCoo = _makeSpatialCooSerializer(_velocityPosClasses) ############# Intervals def _make1DIntervalSerializer(intervClass, lowerClass, upperClass, valueSerializer): """returns a serializer returning stan for a coordinate interval. This will only work for 1-dimensional coordinates. valueSerializer is a function taking the coordinate's value and returning some xmlstan. Currently, error, resolution, and pixSize information is discarded for lack of a place to put them. """ def serialize(node, context): posNode = context.getPosForInterval(node) if isinstance(node.frame, dm.TimeFrame): unit = None # time intervals have no units else: unit = posNode.unit return intervClass(unit=unit, vel_time_unit=getattr(posNode, "velTimeUnit", None),, fill_factor=strOrNull(node.fillFactor))[ lowerClass[valueSerializer(node.lowerLimit)], upperClass[valueSerializer(node.upperLimit)], ] return serialize serialize_TimeInterval = _make1DIntervalSerializer(STC.TimeInterval, STC.StartTime, STC.StopTime, lambda val: STC.ISOTime[isoformatOrNull(val)]) serialize_SpectralInterval = _make1DIntervalSerializer(STC.SpectralInterval, STC.LoLimit, STC.HiLimit, strOrNull) serialize_RedshiftInterval = _make1DIntervalSerializer(STC.RedshiftInterval, STC.LoLimit, STC.HiLimit, strOrNull) _posIntervalClasses = [ (STC.PositionScalarInterval, STC.LoLimit, STC.HiLimit, _wrap1D), (STC.Position2VecInterval, STC.LoLimit2Vec, STC.HiLimit2Vec, _wrap2D), (STC.Position3VecInterval, STC.LoLimit3Vec, STC.HiLimit3Vec, _wrap3D),] _velIntervalClasses = [ (STC.VelocityScalarInterval, STC.LoLimit, STC.HiLimit, _wrap1D), (STC.Velocity2VecInterval, STC.LoLimit2Vec, STC.HiLimit2Vec, _wrap2D), (STC.Velocity3VecInterval, STC.LoLimit3Vec, STC.HiLimit3Vec, _wrap3D),] def _makeSpatialIntervalSerializer(stcClasses): def serialize(node, context): intervClass, lowerClass, upperClass, valueSerializer = \ stcClasses[node.frame.nDim-1] posNode = context.getPosForInterval(node) clsArgs, cooArgs = _getSpatialUnits(posNode) # check where we should stick these units at some point # if len(set(posNode.unit))==1: # unit = posNode.unit[0] # elif posNode.unit: # units = posNode.unit return intervClass(, fill_factor=node.fillFactor, **clsArgs)[ lowerClass[valueSerializer(node.lowerLimit, **cooArgs)], upperClass[valueSerializer(node.upperLimit, **cooArgs)], ] return serialize serialize_SpaceInterval = _makeSpatialIntervalSerializer(_posIntervalClasses) serialize_VelocityInterval = _makeSpatialIntervalSerializer(_velIntervalClasses) ############# Geometries def _getDim(sampleValue): if sampleValue is None: return None if isinstance(sampleValue, float): return 1 else: return len(sampleValue) def _makeBaseGeometry(cls, node, context): buildArgs = _getSpatialUnits(context.getPosForInterval(node))[0] res = cls(frame_id=getattr(node.frame, "id", None), fill_factor=strOrNull(node.fillFactor), **buildArgs) return res
[docs]def serialize_AllSky(node, context): return _makeBaseGeometry(STC.AllSky, node, context)
[docs]def serialize_Circle(node, context): # would you believe that the sequence of center and radius is swapped # in sphere and circle? Oh boy. if node.geoColRef: return STC.Circle[node.geoColRef] nDim = _getDim( if nDim==2: return _makeBaseGeometry(STC.Circle, node, context)[ STC.Center[_wrap2D(], STC.Radius[node.radius], ] elif nDim==3: return _makeBaseGeometry(STC.Sphere, node, context)[ STC.Radius[node.radius], STC.Center[_wrap3D(], ] else: raise common.STCValueError("Spheres are only defined in 2 and 3D")
[docs]def serialize_Ellipse(node, context): if node.geoColRef: return STC.Ellipse[node.geoColRef] if _getDim( cls, wrap = STC.Ellipse, _wrap2D else: raise common.STCValueError("Ellipses are only defined in 2D") return _makeBaseGeometry(cls, node, context)[ STC.Center[wrap(], STC.SemiMajorAxis[node.smajAxis], STC.SemiMinorAxis[node.sminAxis], STC.PosAngle[node.posAngle], ]
[docs]def serialize_Box(node, context): if node.geoColRef: return STC.Box[node.geoColRef] if _getDim(!=2: raise common.STCValueError("Boxes are only available in 2D") return _makeBaseGeometry(STC.Box, node, context)[ STC.Center[_wrap2D(], STC.Size[_wrap2D(node.boxsize)]]
[docs]def serialize_Polygon(node, context): if node.geoColRef: return STC.Polygon[node.geoColRef] if node.vertices and _getDim(node.vertices[0])!=2: raise common.STCValueError("Polygons are only available in 2D") return _makeBaseGeometry(STC.Polygon, node, context)[ [STC.Vertex[STC.Position[_wrap2D(v)]] for v in node.vertices]]
[docs]def serialize_Convex(node, context): if node.geoColRef: return STC.Polygon[node.geoColRef] return _makeBaseGeometry(STC.Convex, node, context)[ [STC.Halfspace[STC.Vector[_wrap3D(v[:3])], STC.Offset[v[3]]] for v in node.vectors]]
[docs]def serialize_MultiCompound(node, context): if len(node.children)<2: return _nodeToStan(node) return {"Union": STC.Union, "Intersection": STC.Intersection}[ node.__class__.__name__][[_nodeToStan(c, context) for c in node.children]]
serialize_Union = serialize_Intersection = serialize_MultiCompound
[docs]def serialize_Difference(node, context): if len(node.children)!=2: raise common.STCValueError("Difference is only supported with two operands") op1 = _nodeToStan(node.children[0], context) op2 = _nodeToStan(node.children[1], context) # Banzai! To save myself the trouble of having all those icky *2 # elements around, I hack op2's name. op2.name_ = op2.name_+"2" return STC.Difference[op1, op2]
[docs]def serialize_Not(node, context): if len(node.children)!=1: raise common.STCValueError("Not is only supported with one operand") return STC.Negation[_nodeToStan(node.children[0], context)]
############# Toplevel
[docs]def makeAreas(rootNode, context): """serializes the areas contained in rootNode. This requires all kinds of insane special handling. """ if not rootNode.areas: return elif len(rootNode.areas)==1: return _nodeToStan(rootNode.areas[0], context) else: # implicit union return STC.Region[ STC.Union[ [_nodeToStan(n, context) for n in rootNode.areas]]]
def _nodeToStan(astNode, context): """returns xmlstan for whatever is in astNode. """ return globals()["serialize_"+astNode.__class__.__name__](astNode, context)
[docs]def nodeToStan(astNode): """returns xmlstan for an AST node. """ context = Context(astNode) return _nodeToStan(astNode, context)
[docs]def astToStan(rootNode, stcRoot): """returns STC stan for the AST rootNode wrapped in the stcRoot element. The first coordinate system defined in the AST is always used for the embedded coordinates and areas. """ context = Context(rootNode) stcRoot[_nodeToStan(rootNode.astroSystem, context)] return stcRoot[_nodeToStan(rootNode.astroSystem, context), STC.AstroCoords([ [_nodeToStan(n, context) for n in [rootNode.time,, rootNode.velocity, rootNode.freq, rootNode.redshift] if n] ], STC.AstroCoordArea([ [_nodeToStan(n, context) for n in rootNode.timeAs], makeAreas(rootNode, context), [_nodeToStan(n, context) for n in itertools.chain(rootNode.velocityAs, rootNode.freqAs, rootNode.redshiftAs)]], ]
[docs]def getSTCXProfile(rootNode): return astToStan(rootNode, STC.STCResourceProfile).render( prefixForEmpty="stc")