Source code for gavo.adql.fieldinfo

"""
Field Infos -- annotations to ADQL parse nodes carrying values.

To do this, we have a set of naive heuristics how types, ucds, and units
behave when such "fields" are combined.  Since right now, we don't parse
out enough and, at least for ucds and units we don't have enough data
to begin with, much of this is conjecture.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import re


class _CoercNode(object):
	"""An entry in the coercion tree.
	"""
	def __init__(self, name, children=(), aliases=()):
		self.name, self.aliases = name, aliases
		self.parent, self.children = None, children
		for child in self.children:
			child.parent = self

	def getAncestorNames(self):
		if self.parent is None:
			return [self.name]
		res = self.parent.getAncestorNames()
		res.append(self.name)
		return res


[docs]class Coercions(object): """A tree of types that can be used to infer common types. The tree is passed in as nested sequences. >>> c = Coercions(_CoercNode('bar', (_CoercNode('foo'), _CoercNode('baz', ... (_CoercNode('quux'),))))) >>> c.getSubsuming([]) 'bar' >>> c.getSubsuming(['foo']) 'foo' >>> c.getSubsuming(['foo', 'foo']) 'foo' >>> c.getSubsuming(['foo', 'quux']) 'bar' >>> c.getSubsuming(['foo', 'weird']) 'bar' """ def __init__(self, typeTree): self.typesIndex = {} self.root = typeTree def index(node): self.typesIndex[node.name] = node for a in node.aliases: self.typesIndex[a] = node for c in node.children: index(c) index(self.root) def _unify(self, n1, n2): """returns the first node that is an ancestor to both n1 and n2. """ ancestors = set(n1.getAncestorNames()) while n2: if n2.name in ancestors: return n2 n2 = n2.parent return self.root
[docs] def getSubsuming(self, typeSeq): """returns the least general type being able to represent all types within typeSeq. The method returns the root type for both an empty typeSeq or a typeSeq containing an unknown type. We don't want to fail here, and the "all-encompassing" type should handle any crap. """ try: startNodes = [self.typesIndex[t] for t in typeSeq] except KeyError: # don't know at least one type return self.root.name try: return functools.reduce(self._unify, startNodes).name except TypeError: # startNodes is empty return self.root.name
N = _CoercNode _coercions = Coercions( N('raw', ( N('unicode', ( N('text', ( N("double precision", aliases=("double",), children=( N("real", aliases=("float",), children=( N("bigint", ( N("integer", aliases=("int",), children=( N("smallint", ( N('bytea'), N('boolean'),)),)),)),)),)), N('timestamp', ( N('date'), N('time'),)), N('file'), N('box'), N('spoint'), N('scircle'), N('spoly', ( N('sbox'),)), ),),),),))) del N _stringRE = re.compile(r"(?:character varying|varchar|char)\(\d*\)") _arrayRE = re.compile(r"([^[]*)(?:\s*\[[0-9 ]*\])+")
[docs]def isArray(type): return _arrayRE.match(type)
[docs]def getSubsumingType(sqlTypes): """returns an approximate sql type for a value composed of the types mentioned in the sequence sqlTypes. Basically, we have the coercion sequence int -> float -> text, where earlier types get clobbered by later ones. And then there's messy stuff like dates. We don't want to fail here, so if all else fails, we just make it a text. Since we don't know what operation is being performed, this can never be accurate; the idea is to come up with something usable to generate VOTables from ADQL results. We do arrays (and subsume them by subsuming all types and gluing a [] to the result; the char(x) and friends are all subsumed to text. All input is supposed to be lower case. >>> getSubsumingType(["smallint", "integer"]) 'integer' >>> getSubsumingType(["real[]", "double precision[]"]) 'double precision[]' >>> getSubsumingType(["real [ ]", "integer[5]"]) 'real[]' >>> getSubsumingType(["double precision", "integer[5]"]) 'double precision[]' """ cleanedTypes, wasArray = [], False for type in sqlTypes: if _stringRE.match(type): return "text" mat = _arrayRE.match(type) if mat: type = mat.group(1) wasArray = True cleanedTypes.append(type.strip()) subsType = _coercions.getSubsuming(cleanedTypes) if wasArray: return subsType+"[]" else: return subsType
[docs]class FieldInfo(object): """a container for meta information on columns. It is constructed with a unit, a ucd and userData. UserData is a sequence of opaque objects. A FieldInfo combined from more than one FieldInfo will have all userDatas of the combined FieldInfos in its userData attribute. There's an attribute ignoreTableStats that should be set when table statistics on user data is likely to be off; as of this writing, this is only done for CTEs; if we ever do more sophisticated planning, I expect we'll have to do that whenever there is a SELECT. There's also a properties dictionary you can use to set arbitrary keys in. These should in general not be combined but just discarded as a whole when a field info is touched. - xtype -- where applicable, write an ADQL xtype. - src.expression -- flattened ADQL this was made from """ def __init__(self, type, unit, ucd, userData=(), tainted=False, stc=None, sqlName=None, ignoreTableStats=False): self.type = type self.ucd = ucd self.unit = unit self.stc = stc self.sqlName = sqlName self.userData = userData self.tainted = tainted self.ignoreTableStats = ignoreTableStats self.properties = {} def __eq__(self, other): try: return (self.type==other.type and self.ucd==other.ucd and self.unit==other.unit and self.stc==other.stc and self.tainted==other.tainted) except AttributeError: return False def __ne__(self, other): return not self==other def __repr__(self): return "FieldInfo(%s, %s, %s, %s)"%( repr(self.type), repr(self.unit), repr(self.ucd), repr(self.userData)) def __hash__(self): # this is used to decide whether two fieldinfos reference the # same thing; our repr is close enough for that decision return hash(repr(self))
[docs] @staticmethod def combineUserData(fi1, fi2): return fi1.userData+fi2.userData
[docs] @staticmethod def combineSTC(fi1, fi2): """tries to find a common STC system for fi1 and fi2. Two STC systems are compatible if at least one is None or if they are equal. If this method discovers incompatible systems, it will set the stc attribute to "BROKEN". """ if fi1.stc is None and fi2.stc is None: return None elif fi2.stc is None or fi1.stc==fi2.stc: return fi1.stc elif fi1.stc is None: return fi2.stc else: # Trouble: stcs not equal but given, warn and blindly return # fi1's stc res = fi1.stc.change() res.broken = ("This STC info is bogus. It is the STC from an" " expression combining two different systems.") return res
[docs] @classmethod def fromMulExpression(cls, opr, fi1, fi2): """returns a new FieldInfo built from the multiplication-like operator opr and the two field infos. The unit is unit1 opr unit2 unless we have a dimless (empty unit), in which case we keep the unit but turn the tainted flag on, unless both are empty. The ucd is always empty unless it's a simple dimless multiplication, in which case the ucd of the non-dimless is kept (but the info is tainted). """ unit1, unit2 = fi1.unit, fi2.unit newUserData = cls.combineUserData(fi1, fi2) stc = cls.combineSTC(fi1, fi2) newType = getSubsumingType([fi1.type, fi2.type]) if unit1=="" and unit2=="": return cls(newType, "", "", newUserData, stc=stc, tainted=True) elif unit1=="": return cls(newType, unit2, fi2.ucd, newUserData, tainted=True, stc=stc) elif unit2=="": return cls(newType, unit1, fi1.ucd, newUserData, tainted=True, stc=stc) else: if opr=="/": unit2 = "(%s)"%unit2 return cls(newType, unit1+opr+unit2, "", newUserData, tainted=True, stc=stc, ignoreTableStats=fi1.ignoreTableStats or fi2.ignoreTableStats)
[docs] @classmethod def fromAddExpression(cls, opr, fi1, fi2, forceType=None): """returns a new FieldInfo built from the addition-like operator opr and the two field infos. If both UCDs and units are the same, they are kept. Otherwise, they are cleared and the fieldInfo is tainted. """ unit, ucd, taint = "", "", True stc = cls.combineSTC(fi1, fi2) if fi1.unit==fi2.unit: unit = fi1.unit else: # if there's no unit on one but there is one on the other, we're # tentatively accepting the unit given (but it's tainted either way) if fi1.unit and not fi2.unit: unit = fi1.unit elif fi2.unit and not fi1.unit: unit = fi2.unit taint = True if fi1.ucd==fi2.ucd: ucd = fi1.ucd else: taint = True if forceType is not None: newType = forceType else: newType = getSubsumingType([fi1.type, fi2.type]) return cls(newType, unit, ucd, cls.combineUserData(fi1, fi2), taint, stc, ignoreTableStats=fi1.ignoreTableStats or fi2.ignoreTableStats)
[docs] def change(self, **kwargs): consArgs = {"type": self.type, "unit": self.unit, "ucd": self.ucd, "userData": self.userData, "tainted": self.tainted, "stc": self.stc, "ignoreTableStats": self.ignoreTableStats} consArgs.update(kwargs) res = FieldInfo(**consArgs) res.properties = self.properties.copy() return res
def _test(): # pragma: no cover import doctest doctest.testmod() if __name__=="__main__": # pragma: no cover _test()