Source code for gavo.rscdesc

"""
Structure definition of resource descriptors.

The stuff they are describing is not a resource in the VO sense (whatever
that is) or in the Dublin Core sense, but simply stuff held together
by common metadata.  If it's got the same creator, the same base title,
the same keywords, etc., it's described by one RD.

In the DaCHS, a resource descriptor typically sets up a schema in
the database.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import os
import pkg_resources
import time
import threading
import weakref

from gavo import base
from gavo import registry
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils
from gavo.rscdef import common
from gavo.rscdef import rdinj
from gavo.rscdef import regtest
from gavo.rscdef import scripting
from gavo.rscdef import executing


[docs]class ResdirAttribute(base.UnicodeAttribute): """An attribute representing an RD's resdir. This is for resource's resdir attribute, which will by default get the inputsDir prepended. There is the special value "." that means "the directory containing the RD", which is what people should do now. This will only work for RDs. Note that without that, the resdir defaults to <inputsDir>/<schemaName>. """
[docs] def feedObject(self, instance, parsedValue): setattr(self, f"_original_resdir_{self.name_}", parsedValue) if parsedValue==".": parsedValue = "/".join(instance.sourceId.split("/")[:-1]) parsedValue = os.path.join( base.getConfig("inputsDir"), parsedValue) super().feedObject(instance, parsedValue)
[docs]class RD(base.Structure, base.ComputedMetaMixin, scripting.ScriptingMixin, base.StandardMacroMixin, common.PrivilegesMixin): """A resource descriptor. RDs collect all information about how to parse a particular source (like a collection of FITS images, a catalogue, or whatever), about the database tables the data ends up in, and the services used to access them. In DaCHS' RD XML serialisation, they correspond to the root element. """ name_ = "resource" # this is set somewhere below once parsing has proceeded far enough # such that caching the RD make sense cacheable = False # we used to accept scripts in RDs and probably will do so again, # but for now the semantics are not well-defined and we reject all # scripts for a while acceptedScriptTypes = {} _resdir = ResdirAttribute("resdir", default=None, description="Base directory for source files and everything else" " belonging to the resource. Use a single dot (.) to say 'the" " directory the RD resides in', which is recommended in modern" " DaCHS.", copyable=True) _schema = base.UnicodeAttribute("schema", default=base.Undefined, description="Database schema for tables defined here. Follow the rule" " 'one schema, one RD' if at all possible. If two RDs share the same" " schema, the must generate exactly the same permissions for that" " schema; this means, in particular, that if one has an ADQL-published" " table, so must the other. In a nutshell: one schema, one RD.", copyable=True, callbacks=["_inferResdir"]) _dds = base.StructListAttribute("dds", childFactory=rscdef.DataDescriptor, description="Descriptors for the data generated and/or published" " within this resource.", copyable=True, before="outputTables") _tables = base.StructListAttribute("tables", childFactory=rscdef.TableDef, description="A table used or created by this resource", copyable=True, before="dds") _outputTables = base.StructListAttribute("outputTables", childFactory=svcs.OutputTableDef, description="Canned output tables for later reference.", copyable=True) _rowmakers = base.StructListAttribute("rowmakers", childFactory=rscdef.RowmakerDef, description="Transformations for going from grammars to tables." " If specified in the RD, they must be referenced from make" " elements to become active.", copyable=True, before="dds") _procDefs = base.StructListAttribute("procDefs", childFactory=rscdef.ProcDef, description="Procedure definintions (rowgens, rowmaker apply-s)", copyable=True, before="rowmakers") _condDescs = base.StructListAttribute("condDescs", childFactory=svcs.CondDesc, description="Global condition descriptors for later reference", copyable=True, before="cores") _resRecs = base.StructListAttribute("resRecs", childFactory=registry.ResRec, description="Non-service resources for the IVOA registry. They will" " be published when gavo publish is run on the RD.") _services = base.StructListAttribute("services", childFactory=svcs.Service, description="Services exposing data from this resource.", copyable=True) _macDefs = base.MacDefAttribute(before="tables", description="User-defined macros available on this RD") _mixinDefs = base.StructListAttribute("mixdefs", childFactory=rscdef.MixinDef, description="Mixin definitions (usually not for users)") _require = base.ActionAttribute("require", methodName="importModule", description="Import the named gavo module (for when you need something" " registered)") _cores = base.MultiStructListAttribute("cores", childFactory=svcs.getCore, childNames=list(svcs.CORE_REGISTRY.keys()), description="Cores available in this resource.", copyable=True, before="services") _jobs = base.StructListAttribute("jobs", childFactory=executing.Execute, description="Jobs to be run while this RD is active.") _tests = base.StructListAttribute("tests", childFactory=regtest.RegTestSuite, description="Suites of regression tests connected to this RD.") _coverage = base.StructAttribute("coverage", childFactory=rscdef.Coverage, default=None, description="STC coverage of this resource.", copyable=True) _properties = base.PropertyAttribute() def __init__(self, srcId, **kwargs): # RDs never have parents, so contrary to all other structures they # are constructed with with a srcId instead of a parent. You # *can* have that None, but such RDs cannot be used to create # non-temporary tables, services, etc, since the srcId is used # in the construction of identifiers and such. self.sourceId = srcId or "temporary" base.Structure.__init__(self, None, **kwargs) # The rd attribute is a weakref on self. Always. So, this is the class # that roots common.RDAttributes self.rd = weakref.proxy(self) # real dateUpdated is set by getRD, this is just for RDs created # on the fly. self.dateUpdated = datetime.datetime.utcnow() # if an RD is parsed from a disk file, this gets set to its path # by getRD below self.srcPath = None # this is for modified-since and friends. self.loadedAt = time.time() # keep track of RDs depending on us for the registry code # (only read this) self.rdDependencies = set() def __iter__(self): return iter(self.dds) def __repr__(self): return "<resource descriptor for %s>"%self.sourceId
[docs] def validate(self): if not utils.identifierPattern.match(self.schema): raise base.StructureError("DaCHS schema attributes must be valid" " python identifiers")
[docs] def isDirty(self): """returns true if the RD on disk has a timestamp newer than loadedAt. """ if isinstance(self.srcPath, PkgResourcePath): # stuff from the resource package should not change underneath us. return False try: if self.srcPath is not None: return os.path.getmtime(self.srcPath)>self.loadedAt except os.error: # this will usually mean the file went away return True return False
[docs] def importModule(self, ctx): # this is a callback for the require attribute utils.loadInternalObject(self.require, "__doc__")
[docs] def onElementComplete(self): for table in self.tables: self.readProfiles = self.readProfiles | table.readProfiles table.setMetaParent(self) self.serviceIndex = {} for svc in self.services: self.serviceIndex[svc.id] = svc svc.setMetaParent(self) for dd in self.dds: dd.setMetaParent(self) if self.resdir and not os.path.isdir(self.resdir): base.ui.notifyWarning("RD %s: resource directory '%s' does not exist"%( self.sourceId, self.resdir)) super().onElementComplete()
def _inferResdir(self, value): # a callback for the schema attribute if self.resdir is None: self._resdir.feedObject(self, value)
[docs] def iterDDs(self): return iter(self.dds)
[docs] def getService(self, id): return self.serviceIndex.get(id, None)
[docs] def getTableDefById(self, id): return self.getById(id, rscdef.TableDef)
[docs] def getDataDescById(self, id): return self.getById(id, rscdef.DataDescriptor)
[docs] def getById(self, id, forceType=None): try: res = self.idmap[id] except KeyError: raise base.NotFoundError( id, "Element with id", "RD %s"%(self.sourceId)) if forceType: base.assertType(id, res, forceType) return res
[docs] def getRelResdir(self): """returns the inputsDir-relative resource directory path. This never has either a leading or a trailing path. """ return utils.getRelativePath(self.resdir, base.getConfig("inputsDir"))
[docs] def getAbsPath(self, relPath): """returns the absolute path for a resdir-relative relPath. """ return os.path.join(self.resdir, relPath)
[docs] def openRes(self, relPath, mode="rb"): """returns a file object for relPath within self's resdir. Deprecated. This is going to go away, use getAbsPath and a context manager. """ return open(self.getAbsPath(relPath), mode)
def _computeIdmap(self): res = {} for child in self.iterChildren(): if hasattr(child, "id"): res[child.id] = child return res
[docs] def addDependency(self, rd, prereq): """declares that rd needs the RD prereq to properly work. This is used in the generation of resource records to ensure that, e.g. registered data have added their served-bys to the service resources. """ if rd.sourceId!=prereq.sourceId: self.rdDependencies.add((rd.sourceId, prereq.sourceId))
[docs] def copy(self, parent): base.ui.notifyWarning("Copying an RD -- this may not be a good idea") new = base.Structure.copy(self, parent) new.idmap = new._computeIdmap() new.sourceId = self.sourceId return new
[docs] def updateMetaInDB(self, connection, reason): """updates this RD's metadata in the database. See the grammar in //rds for what you can give for reason. """ rsc.makeData( base.resolveCrossId("//rds#update-for-rd"), connection=connection, forceSource=(reason, self))
[docs] def invalidate(self): """make the RD fail on every attribute read. See rscdesc._loadRDIntoCache for why we want this. """ errMsg = ("Loading of %s failed in another thread; this RD cannot" " be used here")%self.sourceId class BrokenClass(object): """A class that reacts to all attribute requests with a some exception. """ def __getattribute__(self, attributeName): if attributeName=="__class__": return BrokenClass raise base.ReportableError(errMsg) self.__class__ = BrokenClass
def _meta__metadataUpdated(self): """falls back to utcnow in case we don't know the metadata update time. Since _metadataUpdated falls back to the file creation time, this is only reached for file-less RDs. For these, just about now would be a good guess as to their creation time. In reality, this shouldn't be reached outside of test code; there, however, it's important because this is used in required attributes in OAI-PMH and VOResource. """ return utils.formatISODT(datetime.datetime.utcnow())
[docs] def macro_RSTccbysa(self, stuffDesignation): """expands to a declaration that stuffDesignation is available under CC-BY-SA. This only works in reStructured text (though it's still almost readable as source). You'll probably want to use the `//procs#license-cc-by-sa`_ stream instead of this, as that also sets the rights URI. """ return ("%s is licensed under the `Creative Commons Attribution" " Share-Alike 4.0" " License <http://creativecommons.org/licenses/by-sa/4.0/>`_\n\n" ".. image:: /static/img/ccbysa.png\n :alt: [CC-BY-SA]\n" )%stuffDesignation
[docs] def macro_RSTccby(self, stuffDesignation): """expands to a declaration that stuffDesignation is available under CC-BY. This only works in reStructured text (though it's still almost readable as source). You'll probably want to use the `//procs#license-cc-by`_ stream instead of this, as that also sets the rights URI. """ return ("%s is licensed under the `Creative Commons Attribution 4.0" " License <http://creativecommons.org/licenses/by/4.0/>`_\n\n" ".. image:: /static/img/ccby.png\n :alt: [CC-BY]\n\n" )%stuffDesignation
[docs] def macro_RSTcc0(self, stuffDesignation): """expands to a declaration that stuffDesignation is available under CC-0. This only works in reStructured text (though it's still almost readable as source). You'll probably want to use the `//procs#license-cc0`_ stream instead of this, as that also sets the rights URI. """ return ("To the extent possible under law, the publisher has" " waived all copyright and related or neighboring rights to %s." " For details, see the `Creative Commons CC0 1.0" " Public Domain dedication" " <http://creativecommons.org/publicdomain/zero/1.0/>`_. Of course," " you should still give proper credit when using this data as" " required by good scientific practice.\n\n" ".. image:: /static/img/cc0.png\n :alt: [CC0]\n\n" )%stuffDesignation
[docs]class RDParseContext(base.ParseContext): """is a parse context for RDs. It defines a couple of attributes that structures can ask for (however, it's good practice not to rely on their presence in case someone wants to parse XML snippets with a standard parse context, so use getattr(ctx, "doQueries", True) or somesuch. """ def __init__(self, doQueries=True, restricted=False, forRD=None): self.doQueries = doQueries base.ParseContext.__init__(self, restricted, forRD)
[docs] @classmethod def fromContext(cls, ctx, forRD=None): """a constructor that makes a context with the parameters taken from the RDParseContext ctx. """ return cls(doQueries=ctx.doQueries, restricted=ctx.restricted, forRD=forRD)
@property def failuresAreCacheable(self): """returns true if failures produced with this context should be cached. This is not the case with restricted parses. """ return not self.restricted
[docs]class PkgResourcePath(str): """A sentinel class used to mark an RD as coming from pkg_resources. """ def __str__(self): return self
[docs]def canonicalizeRDId(srcId): """returns a standard rd id for srcId. srcId may be a file system path, or it may be an "id". The canonical basically is "inputs-relative path without .rd extension". Everything that's not within inputs or doesn't end with .rd is handed through. // is expanded to __system__/. The path to built-in RDs, /resources/inputs, is treated analogous to inputsDir. """ if srcId.startswith("//"): srcId = "__system__"+srcId[1:] # This may see un-normalised path; let's try and follow unix path # semantics cleanedPath = [] for segment in srcId.split("/"): if segment==".": pass elif segment=="..": if cleanedPath: cleanedPath.pop() else: raise ValueError("Too many .. in relative rd id") else: cleanedPath.append(segment) srcId = "/".join(cleanedPath) for inputsDir in (base.getConfig("inputsDir"), "/resources/inputs"): if srcId.startswith(inputsDir): srcId = srcId[len(inputsDir):].lstrip("/") if srcId.endswith(".rd"): srcId = srcId[:-3] return srcId
def _getFilenamesForId(srcId): """helps getRDInputStream by iterating over possible files for srcId. """ if srcId.startswith("/"): yield srcId+".rd" yield srcId else: inputsDir = base.getConfig("inputsDir") yield os.path.join(inputsDir, srcId)+".rd" yield os.path.join(inputsDir, srcId) yield "/resources/inputs/%s.rd"%srcId yield "/resources/inputs/%s"%srcId
[docs]def getRDInputStream(srcId): """returns a read-open stream for the XML source of the resource descriptor with srcId. srcId is already normalized; that means that absolute paths must point to a file (sans possibly .rd), relative paths are relative to inputsDir or pkg_resources(/resources/inputs). This function prefers files with .rd to those without, and inputsDir to pkg_resources (the latter allowing the user to override built-in system RDs). """ for fName in _getFilenamesForId(srcId): if os.path.isfile(fName): # We don't want RDs from outside of inputs and config, as # these make referencing really messy. filePath = os.path.abspath(fName) if not ( filePath.startswith(base.getConfig("inputsDir")) or filePath.startswith(base.getConfig("configDir"))): raise base.ReportableError("%s: Only RDs below inputsDir (%s) are" " allowed."%(fName, base.getConfig("inputsDir"))) return fName, open(fName, "rb") if (pkg_resources.resource_exists('gavo', fName) and not pkg_resources.resource_isdir('gavo', fName)): return (PkgResourcePath(fName), pkg_resources.resource_stream('gavo', fName)) raise base.RDNotFound(srcId)
[docs]def setRDDateTimes(ctx, rd, inputFile): """sets the _dataUpdated and _metadataUpdated meta items on rd. """ rd.setMeta("_metadataUpdated", datetime.datetime.utcfromtimestamp( utils.fgetmtime(inputFile))) try: rd.setMeta("_dataUpdated", ctx.getInjected("_dataUpdated")) except KeyError: # no data imported yet, probably pass
USERCONFIG_RD_PATH = os.path.join(base.getConfig("configDir"), "userconfig") class _UserConfigFakeRD(object): """A fake object that's in the RD cache as "%". This is used by the id resolvers in parsecontext; this certainly is of no use as an RD otherwise. """ def __init__(self): pass def getRealRD(self): return base.caches.getRD(USERCONFIG_RD_PATH) def getMeta(self, *args, **kwargs): return base.caches.getRD(USERCONFIG_RD_PATH).getMeta(*args, **kwargs) def getById(self, id, forceType=None): """returns an item from userconfig. This first tries to resolve id in gavo/etc/userconfig.rd, then in the fallback //userconfig.rd. """ try: try: return base.caches.getRD( os.path.join(base.getConfig("configDir"), "userconfig.rd") ).getById(id, forceType=forceType) except base.NotFoundError: pass except Exception as msg: base.ui.notifyError("Bad userconfig: (%s), ignoring it. Run" " 'dachs val %%' to see actual errors."%repr(msg)) return base.caches.getRD("//userconfig" ).getById(id, forceType=forceType) except base.NotFoundError: raise base.NotFoundError(id, "Element with id", "etc/userconfig.rd")
[docs]def refuseBlacklisted(srcId, rdInputPath): """raises an exception if rdInputPath ends with or srcId is equal to one of the strings in [general]rdblacklist. """ for blacklistedName in base.getConfig("rdblacklist"): if (rdInputPath.endswith(blacklistedName) or blacklistedName==srcId): raise utils.Error( f"RD {srcId} is blacklisted in gavo.rc. Not loading.")
[docs]def getRD(srcId, doQueries=True, restricted=False, useRD=None): """returns a ResourceDescriptor for srcId. srcId is something like an input-relative path; you'll generally omit the extension (unless it's not the standard .rd). getRD furnishes the resulting RD with an idmap attribute containing the mapping from id to object collected by the parse context. The useRD parameter is for _loadRDIntoCache exclusively and is used by it internally. It is strictly an ugly implementation detail. """ if srcId=='%': return _UserConfigFakeRD() if useRD is None: rd = RD(canonicalizeRDId(srcId)) else: rd = useRD srcPath, inputFile = getRDInputStream(rd.sourceId) try: refuseBlacklisted(srcId, srcPath) except utils.Error: inputFile.close() raise # look for a context upstack and get the default parameters from there, # overriding the parameters. try: getRD_context = RDParseContext.fromContext( utils.stealVar("getRD_context"), forRD=rd.sourceId) except ValueError: # no getRD_context variable in the stack getRD_context = RDParseContext(doQueries=doQueries, restricted=restricted, forRD=rd.sourceId) rdinj.injectIntoContext(getRD_context, rd.sourceId) if not isinstance(srcPath, PkgResourcePath): srcPath = os.path.abspath(srcPath) rd.srcPath = getRD_context.srcPath = srcPath rd.idmap = getRD_context.idmap try: rd = base.parseFromStream(rd, inputFile, context=getRD_context) setRDDateTimes(getRD_context, rd, inputFile) except Exception as ex: ex.inFile = srcPath ex.cacheable = getRD_context.failuresAreCacheable raise finally: inputFile.close() return rd
# in _CURRENTLY_PARSING, getRD keeps track of what RDs are currently being # parsed. The keys are the canonical sourceIds, the values are pairs of # an unfinished RD and RLocks protecting it. _CURRENTLY_PARSING_LOCK = threading.Lock() _CURRENTLY_PARSING = {} import threading
[docs]class CachedException(object): """An exception that occurred while parsing an RD. This will remain in the cache until the underlying RD is changed. """ def __init__(self, exception, sourcePath): self.exception = exception.with_traceback(None) self.sourcePath = sourcePath # this can race a bit in that we won't catch saves done between # we started parsing and we came up with the exception, but # these are easy to fix by saving again, so we won't bother. try: self.loadedAt = os.path.getmtime(self.sourcePath) except (TypeError, os.error): # If the file doesn't exist, that state is "as of now" self.loadedAt = time.time()
[docs] def isDirty(self): if self.sourcePath is None: # this can have various reasons, but most likely it's because # the RD hasn't been there. Since we can't tell if the # file has appreared in the mean time, we'll have to re-check return False if not os.path.exists(self.sourcePath): # someone has removed the file, kill cache return True return os.path.getmtime(self.sourcePath)>self.loadedAt
[docs] def raiseAgain(self): # XXX TODO: do we want to fix the traceback here? raise self.exception.with_traceback(None)
def _loadRDIntoCache(canonicalRDId, cacheDict): """helps _makeRDCache. This function contains the locking logic that makes sure multiple threads can load RDs. """ with _CURRENTLY_PARSING_LOCK: if canonicalRDId in _CURRENTLY_PARSING: lock, rd = _CURRENTLY_PARSING[canonicalRDId] justWait = True else: lock, rd = threading.RLock(), RD(canonicalRDId) _CURRENTLY_PARSING[canonicalRDId] = lock, rd lock.acquire() justWait = False if justWait: # Someone else is already parsing. If it's the current thread, # go on (lock is an RLock!) so we can resolve self-references # (as long as they are backward references). All other threads # just wait for the parsing thread to finish lock.acquire() lock.release() return rd try: try: cacheDict[canonicalRDId] = getRD(canonicalRDId, useRD=rd) except Exception as ex: # Importing failed, invalidate the RD (in case other threads still # see it from _CURRENTLY_PARSING) if getattr(ex, "cacheable", False): cacheDict[canonicalRDId] = CachedException(ex, getattr(rd, "srcPath", None)) rd.invalidate() raise finally: del _CURRENTLY_PARSING[canonicalRDId] lock.release() return cacheDict[canonicalRDId] def _makeRDCache(): """installs the cache for RDs. One trick here is to handle "aliasing", i.e. making sure that you get identical objects regardless of whether you request __system__/adql.rd, __system__/adql, or //adql. Then, we're checking for "dirty" RDs (i.e., those that should be reloaded). The messiest part is the support for getting RDs in the presence of threads while still supporting recursive references, though. """ # TODO: Maybe unify this again with caches._makeCache? That stuff could # do with a facility to invalidate cached entries, too. # But care is necessary to not cache any RD parsed in a nonstandard # fashion (e.g., in restricted mode). CAREFUL: since getRD indulges # in variable stealing, explicit checks are necessary. rdCache = {} currentlyValidating = set() def clearRDIfDirty(srcId): """clears a cached RD for srcId if it's dirty and it can be loaded from disk. If it can't be loaded from disk, the disk file's time stamp is being set as the change date on the rd, so it's counting as clean until it's being changed again. This will avoid re-parsing RDs all the time when they're broken for a while. """ if (srcId in rdCache and getattr(rdCache[srcId], "isDirty", lambda: False)()): try: # limit re-check rate to 10 seconds to avoid getRD storms # on often-used but currently broken RDs if time.time()-rdCache[srcId].loadedAt<10: return if srcId in currentlyValidating: return currentlyValidating.add(srcId) base.ui.notifyWarning("RD dirty, attempting reload of %s"%srcId) try: tempCache = {} _loadRDIntoCache(srcId, tempCache) finally: currentlyValidating.remove(srcId) except Exception as ex: base.ui.notifyError("Tried to reload RD %s but found it broken." " Will retry loading when it has been edited."%srcId) # the source path of the cached exception isn't necessarily # right (for system and userconfig RDs); but it ought # to be good enough for what users typcially are confronted with. if isinstance(rdCache[srcId], CachedException): rdCache[srcId] = CachedException(ex, os.path.join(base.getConfig("inputsDir")+srcId+".rd")) else: rdCache[srcId].loadedAt = time.time() return base.caches.clearForName(srcId) def getRDCached(srcId, **kwargs): if kwargs: return getRD(srcId, **kwargs) srcId = canonicalizeRDId(srcId) # Since the "validate before purge last" change it's possible # that we're currently parsing and in the cache. During the # validation run, we need to return from the the thing that # currently parses. Hence, we need to re-do quite a bit of # the logic of _loadRDIntoCache here. Gnwm. if srcId in _CURRENTLY_PARSING: lock, rd = _CURRENTLY_PARSING[srcId] lock.acquire() lock.release() return rd clearRDIfDirty(srcId) if srcId in rdCache: cachedOb = rdCache[srcId] if isinstance(cachedOb, CachedException): cachedOb.raiseAgain() else: return cachedOb else: return _loadRDIntoCache(srcId, rdCache) getRDCached.cacheCopy = rdCache base.caches.registerCache("getRD", rdCache, getRDCached) _makeRDCache()