Source code for gavo.rscdef.common

"""
Common items used by resource definition objects.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import re
import urllib.parse

from gavo import base
from gavo import utils


# The following is a flag for initdachs (and initdachs exclusively)
# to prevent resource metadata reading from database tables during
# the dachs init.
_BOOTSTRAPPING = False


[docs]class RDAttribute(base.AttributeDef): """an attribute that gives access to the current rd. The attribute is always called rd. There is no default, but on the first access, we look for an ancestor with an rd attribute and use that if it exists, otherwise rd will be None. There currently is no way to reset the rd. These attributes cannot (yet) be fed, so rd="xxx" won't work. If we need this, the literal would probably be an id. """ computed_ = True typeDesc_ = "reference to a resource descriptor" def __init__(self): base.AttributeDef.__init__(self, "rd", None, "The parent" " resource descriptor; never set this manually, the value will" " be filled in by the software.")
[docs] def iterParentMethods(self): def _getRD(self): if getattr(self, "parent", None) is None: # not yet adopted, we may want to try again later return None try: return self.__rd except AttributeError: parent = self.parent while parent is not None: if hasattr(parent, "rd") and parent.rd is not None: self.__rd = parent.rd break parent = parent.parent else: # a parent hasn't been adopted yet, try again later. return None return self.__rd yield ("rd", property(_getRD)) def getFullId(self): if self.rd is None: return self.id return "%s#%s"%(self.rd.sourceId, self.id) yield ("getFullId", getFullId)
[docs] def makeUserDoc(self): return None # don't mention it in docs -- users can't and mustn't set it.
[docs]class ResdirRelativeAttribute(base.FunctionRelativePathAttribute): """is a path that is interpreted relative to the current RD's resdir. The parent needs an RDAttribute. """ def __init__(self, name, default=None, description="Undocumented", **kwargs): base.FunctionRelativePathAttribute.__init__(self, name, baseFunction=self.getResdir, default=default, description=description, **kwargs)
[docs] def getResdir(self, instance): if instance.rd is None: # we don't have a parent yet, but someone wants to see our # value. This can happen if an element is validated before # it is adopted (which we probably should forbid). Here, we # hack around it and hope nobody trips over it return None return instance.rd.resdir
[docs]class ProfileListAttribute(base.AtomicAttribute): """An attribute containing a comma separated list of profile names. There's the special role name "defaults" for whatever default this profile list was constructed with. """ typeDesc_ = "Comma separated list of profile names" def __init__(self, name, default, description, defaultSource): base.AtomicAttribute.__init__(self, name, base.Computed, description) self.realDefault = default self.defaultSource_ = defaultSource @property def default_(self): return self.realDefault.copy()
[docs] def parse(self, value): pNames = set() for pName in value.split(","): pName = pName.strip() if not pName: continue if pName=="defaults": pNames = pNames|self.default_ else: pNames.add(pName) return pNames
[docs] def unparse(self, value): # It would be nice to reconstruct "defaults" here, but right now it's # certainly not worth the effort. return ", ".join(value)
[docs] def makeUserDoc(self): return (f"**{self.name_}** ({self.typeDesc_}, defaults to" f" {self.defaultSource_} from gavorc) -- {self.description_}")
[docs]class PrivilegesMixin(base.StructCallbacks): """A mixin for structures declaring access to database objects (tables, schemas). Access is managed on the level of database profiles. Thus, the names here are not directly role names in the database. We have two types of privileges: "All" means at least read and write, and "Read" meaning at least read and lookup. """ _readProfiles = ProfileListAttribute("readProfiles", default=base.getConfig("db", "queryProfiles"), description="A (comma separated) list of profile names through" " which the object can be read.", defaultSource="[db]queryProfiles") _allProfiles = ProfileListAttribute("allProfiles", default=base.getConfig("db", "maintainers"), description="A (comma separated) list of profile names through" " which the object can be written or administred.", defaultSource="[db]maintainers")
[docs]class IVOMetaMixin(base.StructCallbacks): """A mixin for resources aspiring to have IVO ids. All those need to have an RDAttribute. In return, we're filling in sensible defaults for the referenceURL, identifier, and status meta items (which, as usual for the computed metas, are overridable by normal setMetas). """ def _meta_referenceURL(self): if self.rd is None: return None return base.META_CLASSES_FOR_KEYS["referenceURL"]( self.getURL("info"), title="Service info") def _meta_identifier(self): # if we're called without an RD, that's probably while we're # copied. Code there knows what to do when we return None if self.rd is None: return None return "ivo://%s/%s/%s"%(base.getConfig("ivoa", "authority"), urllib.parse.quote(self.rd.sourceId), self.id) def _meta_published_identifier(self): """returns identifier if we believe we are dealing with a published resource. """ if (getattr(self, "publications", None) or getattr(self, "registration", None)): return self._meta_identifier() def _meta_status(self): return "active"
[docs]class Registration(base.Structure, base.MetaMixin): """A request for registration of a data or table item. This is much like publish for services, just for data and tables; since they have no renderers, you can only have one register element per such element. Data registrations may refer to published services that make their data available. """ name_ = "publish" docName_ = "publish (data)" aliases = ["register"] _sets = base.StringSetAttribute("sets", default=frozenset(["ivo_managed"]), description="A comma-separated list of sets this data will be" " published in. To publish data to the VO registry, just" " say ivo_managed here. Other sets probably don't make much" " sense right now. ivo_managed also is the default.") _servedThrough = base.ReferenceListAttribute("services", description="A DC-internal reference to a service that lets users" " query that within the data collection; tables with adql=True" " are automatically declared as isServiceFor the TAP service.") # the following attribute is for compatibility with service.Publication # in case someone manages to pass such a publication to the capability # builder. auxiliary = True def _completeMetadataFromResRow(self, resRow): """fiddles publication dates from a dc.resources row for the parent table or data item. (see rscdef.rdjinj for where this comes from). """ if resRow.get("rectimestamp"): self.parent.setMeta("_metadataUpdated", resRow["rectimestamp"]) # we ignore dateupdated here, assuming that the info coming from # the RD is more current.
[docs] def completeElement(self, ctx): if (ctx is not None and self.id and self.parent.rd is not None): self._completeMetadataFromResRow( ctx.getInjected("resprop:%s#%s"%( self.parent.rd.sourceId, self.parent.id), {})) super().completeElement(ctx)
[docs] def publishedForADQL(self): """returns true if at least one table published is available for TAP/ADQL. """ if getattr(self.parent, "adql", False): # single table return True for t in getattr(self.parent, "iterTableDefs", lambda: [])(): # data item with multiple tables if t.adql: return True return False
[docs] def register(self): """adds isServiceFor and isServedBy metadata to data, service pairs in this registration. """ if self.publishedForADQL(): tapSvc = base.caches.getRD("//tap").getById("run") if not tapSvc in self.services: self.services.append(tapSvc) for srv in self.services: srv.declareServes(self.parent)
[docs]class ColumnList(list): """A list of column.Columns (or derived classes) that takes care that no duplicates (in name) occur. If you add a field with the same dest to a ColumnList, the previous instance will be overwritten. The idea is that you can override ColumnList in, e.g., interfaces later on. Also, two ColumnLists are considered equal if they contain the same names. After construction, you should set the withinId attribute to something that will help make sense of error messages. """ def __init__(self, *args): list.__init__(self, *args) self.redoIndex() self.withinId = "unnamed table" def __contains__(self, fieldName): return fieldName in self.nameIndex def __eq__(self, other): if isinstance(other, ColumnList): myFields = set([f.name for f in self if f.name not in self.internallyUsedFields]) otherFields = set([f.name for f in other if f.name not in self.internallyUsedFields]) return myFields==otherFields elif other==[] and len(self)==0: return True return False
[docs] def redoIndex(self): """creates a mapping of names to list indexes. You must call this when you dare to munge this manually (which you shouldn't). """ self.nameIndex = dict([(c.name, ct) for ct, c in enumerate(self)])
[docs] def deepcopy(self, newParent): """returns a deep copy of self. This means that all child structures are being copied. In that process, they receive a new parent, which is why you need to pass one in. """ return self.__class__([c.copy(newParent) for c in self])
[docs] def getIdIndex(self): try: return self.__idIndex except AttributeError: self.__idIndex = dict((c.id, c) for c in self if c.id is not None) return self.__idIndex
[docs] def append(self, item): """adds the Column item to the data field list. It will overwrite a Column of the same name if such a thing is already in the list. Indices are updated. """ key = item.name if key in self.nameIndex: nameInd = self.nameIndex[key] assert self[nameInd].name==key, \ "Someone tampered with ColumnList" self[nameInd] = item else: self.nameIndex[item.name] = len(self) list.append(self, item)
[docs] def replace(self, oldCol, newCol): ind = 0 while True: if self[ind]==oldCol: self[ind] = newCol break ind += 1 del self.nameIndex[oldCol.name] self.nameIndex[newCol.name] = ind
[docs] def remove(self, col): del self.nameIndex[col.name] list.remove(self, col)
[docs] def extend(self, seq): for item in seq: self.append(item)
[docs] def getColumnByName(self, name): """returns the column with name. It will raise a NotFoundError if no such column exists. """ try: return self[self.nameIndex[name]] except KeyError: try: return self[self.nameIndex[utils.QuotedName(name)]] except KeyError: raise base.NotFoundError(name, what="column", within=self.withinId)
[docs] def getColumnById(self, id): """returns the column with id. It will raise a NotFoundError if no such column exists. """ try: return self.getIdIndex()[id] except KeyError: raise base.NotFoundError(id, what="column", within=self.withinId)
[docs] def getColumnByUtype(self, utype): """returns the column having utype. This should be unique, but this method does not check for uniqueness. """ utype = utype.lower() for item in self: if item.utype and item.utype.lower()==utype: return item raise base.NotFoundError(utype, what="column with utype", within=self.withinId)
[docs] def getColumnsByUCD(self, ucd): """returns all columns having ucd. """ return [item for item in self if item.ucd==ucd]
[docs] def getColumnByUCD(self, ucd): """returns the single, unique column having ucd. It raises a StructureError if there is no such column or more than one. """ cols = self.getColumnsByUCD(ucd) if len(cols)==1: return cols[0] elif cols: raise base.StructureError("More than one column for %s"%ucd) else: raise base.StructureError("No column for %s"%ucd)
[docs] def getColumnByUCDs(self, *ucds): """returns the single, unique column having one of ucds. This method has a confusing interface. It sole function is to help when there are multiple possible UCDs that may be interesting (like pos.eq.ra;meta.main and POS_EQ_RA_MAIN). It should only be used for such cases. """ for ucd in ucds: try: return self.getColumnByUCD(ucd) except base.StructureError: # probably just no column with this UCD, try next pass raise base.StructureError("No unique column for any of %s"%", ".join(ucds))
[docs]class ColumnListAttribute(base.StructListAttribute): """An adapter from a ColumnList to a structure attribute. """ @property def default_(self): return ColumnList()
[docs] def getCopy(self, instance, newParent, ctx): return ColumnList(base.StructListAttribute.getCopy(self, instance, newParent, ctx))
[docs] def replace(self, instance, oldStruct, newStruct): if oldStruct.name!=newStruct.name: raise base.StructureError("Can only replace fields of the same" " name in a ColumnList") getattr(instance, self.name_).append(newStruct)
[docs] def feedObject(self, instance, obj): # we really want columns and params to have the proper parents if isinstance(obj, list): for child in obj: self.feedObject(instance, child) else: if obj.parent and obj.parent is not instance: obj = obj.copy(instance) super().feedObject(instance, obj)
[docs]class NamePathAttribute(base.AtomicAttribute): """defines an attribute NamePath used for resolution of "original" attributes. The NamePathAttribute provides a resolveName method as expected by base.OriginalAttribute. """ typeDesc_ = "id reference" def __init__(self, **kwargs): if "description" not in kwargs: kwargs["description"] = ("Reference to an element tried to" " satisfy requests for names in id references of this" " element's children.") base.AtomicAttribute.__init__(self, name="namePath", **kwargs)
[docs] def iterParentMethods(self): def resolveName(instance, context, id): if hasattr(instance, "parentTable"): try: return base.resolveNameBased(instance.parentTable, id) except base.NotFoundError: # try on real name path pass if hasattr(instance, "getByName"): try: return instance.getByName(id) except base.NotFoundError: pass np = instance.namePath if np is None and instance.parent: np = getattr(instance.parent, "namePath", None) if np is None: raise base.NotFoundError(id, "Element with name", repr(self), hint="No namePath here") res = context.resolveId(np+"."+id) return res yield "resolveName", resolveName
[docs] def parse(self, value): return value
[docs] def unparse(self, value): return value
_atPattern = re.compile("@(%s)"%utils.identifierPattern.pattern[:-1])
[docs]def replaceProcDefAt(src, dictName="vars"): """replaces @<identifier> with <dictName>["<identifier>"] in src. We do this to support this shortcut in the vicinity of rowmakers (i.e., there and in procApps). """ return _atPattern.sub(r'%s["\1"]'%dictName, src)
# this is mainly here for lack of a better place. I don't want it in # base.parsecontext as it needs config, and I don't want it # in user.common as it might be useful for non-UI stuff.
[docs]def getReferencedElement(refString, forceType=None, **kwargs): """returns the element for the DaCHS reference ``refString``. ``refString`` has the form ``rdId[#subRef]``; ``rdId`` can be filesystem-relative, but the RD referenced must be below ``inputsDir`` anyway. You can pass a structure class into ``forceType``, and a ``StructureError`` will be raised if what's pointed to by the id isn't of that type. You should usually use ``base.resolveCrossId`` instead of this from *within* DaCHS. This is intended for code handling RD ids from users. This supports further keyword arguments to getRD. """ # get the inputs postfix now so we don't pollute the current exception later try: cwdInInputs = utils.getRelativePath(os.getcwd(), base.getConfig("inputsDir"), liberalChars=True) except ValueError: # not in inputs cwdInInputs = None try: return base.resolveCrossId(refString, forceType=forceType, **kwargs) except base.RDNotFound: if cwdInInputs: return base.resolveCrossId("%s/%s"%(cwdInInputs, refString), forceType=forceType) raise
[docs]@utils.document def getStandardPubDID(path): """returns the standard DaCHS PubDID for ``path``. The publisher dataset identifier (PubDID) is important in protocols like SSAP and obscore. If you use this function, the PubDID will be your authority, the path component ~, and the inputs-relative path of the input file as the parameter. ``path`` can be relative, in which case it is interpreted relative to the DaCHS ``inputsDir.`` You *can* define your PubDIDs in a different way, but you'd then need to provide a custom descriptorGenerator to datalink services (and might need other tricks). If your data comes from plain files, use this function. In a rowmaker, you'll usually use the \\standardPubDID macro. """ # Why add inputsDir first and remove it again? Well, I want to keep # getInputsRelativePath in the loop since it does some validation # and may, at some point, do more. if path[0]!="/": path = os.path.join(base.getConfig("inputsDir"), path) auth = base.getConfig("ivoa", "authority") if auth=="x-unregistred": raise base.ReportableError("You must configure your IVOA authority" " before creating standard PubDIDs.", hint="Read up on 'Choosing your authority' in the tutorial to see how" " to fix this.") return "ivo://%s/~?%s"%( auth, getInputsRelativePath(path, liberalChars=True))
[docs]@utils.document def getAccrefFromStandardPubDID(pubdid, authBase="ivo://%s/~?"%base.getConfig("ivoa", "authority")): """returns an accref from a standard DaCHS PubDID. This is basically the inverse of getStandardPubDID. It will raise NotFound if pubdid "looks like a URI" (implementation detail: has a colon in the first 10 characters) and does not start with ivo://<authority>/~?. If it's not a URI, we assume it's a local accref and just return it. The function does not check if the remaining characters are a valid accref, much less whether it can be resolved. authBase's default will reflect you system's settings on your installation, which probably is not what's given in this documentation. """ if ":" not in pubdid[:10]: return pubdid if not pubdid.startswith(authBase): raise base.NotFoundError(pubdid, "The authority in the dataset identifier", "the authorities managed here") return pubdid[len(authBase):]
[docs]@utils.document def getInputsRelativePath(absPath, liberalChars=True): """returns absath relative to the DaCHS inputsDir. If ``absPath`` is not below ``inputsDir``, a ``ValueError`` results. On ``liberalChars``, we see the `function getRelativePath`_. In rowmakers and rowfilters, you'll usually use the macro ``\inputRelativePath`` that inserts the appropriate code. """ return utils.getRelativePath(absPath, base.getConfig("inputsDir"), liberalChars=liberalChars)