Source code for gavo.rscdef.rdinj

"""
RD injections: This code fetches metadata on RDs (identified by their
normalised source ID) from the database and injects it into parse contexts.

In the Structure's completeElement method, the elements can then
look for injected values using ctx.getInjected(key[, default]).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib

from gavo import base
from gavo.base import sqlsupport


[docs]@contextlib.contextmanager def ignoringExpectedErrors(): """a context manager that swallows exceptions the database throws when tables don't exist. We want that here as the tables we're querying here (for any RD that's being loaded) will not exist while bootstrapping. """ try: yield except sqlsupport.ProgrammingError as ex: if ex.pgcode=="42P01": # table not found for postgres return [] raise except base.QueryCanceledError: # Don't fail just because there's a deadlock... return []
def _getRecordsFor(rdId, fromTable, conn): """returns (dict) records from fromTable coming from rdId. """ with ignoringExpectedErrors(): # the timeout is for when someone holds a lock (which should be rare # but happens; we don't want to fail for such a triviality). return list( conn.queryToDicts( "select * from %s where sourceRD=%%(rdId)s"%fromTable, locals(), timeout=0.25)) return []
[docs]def injectIntoContext(ctx, rdId): """fills context's injected metadata from material from the various database tables for the RD rdId. """ with base.getTableConn() as conn: for row in _getRecordsFor(rdId, "dc.rdmeta", conn): if row["data_updated"]: ctx.inject("_dataUpdated", row["data_updated"]) if row.get("spatial"): ctx.inject("spatial_coverage", row["spatial"]) if row.get("temporal"): ctx.inject("temporal_coverage", row["temporal"]) if row.get("spectral"): ctx.inject("spectral_coverage", row["spectral"]) for row in _getRecordsFor(rdId, "dc.resources", conn): key = "resprop:%s#%s"%(row["sourcerd"], row["resid"]) ctx.inject(key, row) tablesInRD = [] for row in _getRecordsFor(rdId, "dc.tablemeta", conn): tablesInRD.append(row["tablename"]) tableId = row["tablename"].split(".")[-1] ctx.inject(f"table:{tableId}", row) for tableName in tablesInRD: tableId = tableName.split(".")[-1] with ignoringExpectedErrors(): for row in conn.queryToDicts("SELECT * FROM dc.simple_col_stats" " WHERE tableName=%(tableName)s", locals(), timeout=0.25): key = f"colstats:{tableId}:{row['column_name']}" ctx.inject(key, row) for row in conn.queryToDicts("SELECT * FROM dc.discrete_string_values" " WHERE tableName=%(tableName)s", locals(), timeout=0.25): key = f"discrete-strings:{tableId}:{row['column_name']}" ctx.inject(key, dict(zip(row["vals"], row["freqs"])))