Source code for gavo.user.limits

"""
Updating table and column metadata.

While column statistics can be explicitly defined in values elements
(and there may be cases when manually defining them makes sense), the
typical case is to gather statistics from the database and keep them in
a few tables in the dc schema.

Starting with DaCHS 2.3.1 (schema version 27), there's
dc.simple_col_stats for floats and "2 sigma" statistics.

Starting with DaCHS 2.5.2 (schema version 30), there's in addition
dc.string_col_dist for statistics of enumerated string columns.

The actual acquisition of the statistics is currently done in user.info
(and should probably move to rscdef).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import base
from gavo import rsc
from gavo import utils
from gavo.user import info


[docs]def iterCoverageItems(updater): """yields coverage items for inclusion in RDs. NOTE: so far, we can only have one coverage item. So, it's enough to just say "fill this into axis x of coverage". If and when we have more than one coverage items, we'll have to re-think that. That's why there's the "reserved" value in the tuples. We'll have to put something in there (presumably the index of the coverage element, but perhaps we'll have a better identity at some point). """ if updater is base.NotGiven: return if updater.parent.spatial is not None: sourceTable = updater.spaceTable or updater.sourceTable if sourceTable: cov = info.getMOCForStdTable(sourceTable, updater.mocOrder) if cov: yield "spatial", cov.asASCII() if updater.parent.temporal is not None: sourceTable = updater.timeTable or updater.sourceTable if sourceTable: res = [] for pair in info.iterScalarLimits( sourceTable, info.getTimeLimitsExprs): res.extend(pair) yield "temporal", res if updater.parent.spectral is not None: sourceTable = updater.spectralTable or updater.sourceTable if sourceTable: res = [] for pair in info.iterScalarLimits( sourceTable, info.getSpectralLimitsExprs): res.extend(pair) yield "spectral", res
[docs]def updateRDLevelMetadata(rd, conn): """Determines RD-level metadata (coverage, mainly) and inserts it into dc.rds. """ conn.execute( "UPDATE dc.rdmeta" " SET spatial=NULL, temporal=NULL, spectral=NULL" " WHERE sourceRD=%(rdId)s", {"rdId": rd.sourceId}) if rd.coverage: for colName, value in iterCoverageItems(rd.coverage.updater): conn.execute( "UPDATE dc.rdmeta" f" SET {colName}=%(value)s" " WHERE sourceRD=%(rdId)s", {"value": value, "rdId": rd.sourceId})
[docs]def updateTableLevelStats( td, conn, samplePercent=None, acquireColumnMeta=True): """determines column metadata for the table td and inserts it into dc.*stats. samplePercent, if given, says how much of the table to look at; giving this on views will fail. If acquireColumnMeta is False, only the size of the table is estimated. """ tableType = base.UnmanagedQuerier(conn).getTableType(td.getQName()) if tableType is None: base.ui.notifyWarning("Skipping non-existing table %s"%td.getQName()) return elif tableType=="VIEW": samplePercent = 0 info.annotateDBTable(td, samplePercent, acquireColumnMeta) for toImport in [ "//dc_tables#import_simple_col_stats", "//dc_tables#import_discrete_string_values"]: rsc.makeData( base.resolveCrossId(toImport), forceSource=td, connection=conn) conn.execute("UPDATE dc.tablemeta SET nrows=%(nrows)s" " WHERE tableName=%(tableName)s", {"nrows": td.nrows, "tableName": td.getQName()})
[docs]def updateForRD(rd, conn, samplePercent=None, acquireColumnMeta=True): """obtains RD- and table-level metadata for rd and writes it to the meta data tables through conn. """ base.ui.notifyInfo(f"Obtaining metadata for rd {rd.sourceId}...") updateRDLevelMetadata(rd, conn) for td in rd.tables: if td.onDisk: if td.viewStatement and not td.getProperty("forceStats", False): continue updateTableLevelStats( td, conn, samplePercent, acquireColumnMeta)
def _getUpdatableRdIds(): """returns a list of RD ids that presumably had dachs limits run on them before (because they have coverage or table stats). """ with base.getTableConn() as conn: return [r[0] for r in conn.query( "SELECT sourceRD FROM" " dc.rdmeta" " WHERE spatial IS NOT NULL" " OR temporal IS NOT NULL OR spectral IS NOT NULL" " UNION" " SELECT sourceRD FROM" " dc.tablemeta" " WHERE nrows IS NOT NULL")]
[docs]def dumpTableLevelStats(td, conn): """writes limits metadata for the table td. """ if not td.onDisk: return qName = td.getQName() heading = f"Statistics for {qName}" print("\n"+heading+"\n"+"-"*len(heading)) try: print("|rows| = {}\n".format(next(conn.query( "SELECT nrows FROM dc.tablemeta WHERE tablename=%(qName)s", locals()))[0] or "<Unknown>")) except StopIteration: print("No metadata (table not imported)?") return statsTD = base.resolveCrossId("//dc_tables#simple_col_stats") colNames = ("column_name min_value max_value percentile03" " median percentile97 fill_factor").split() stats = [["{:.5g}".format(v) if isinstance(v, float) else utils.makeEllipsis(str(v), 12, "…") for v in r] for r in conn.query(statsTD.getSimpleQuery( colNames, "tablename=%(tablename)s"), {"tablename": td.getQName()})] stats.sort() print(utils.formatSimpleTable(stats, False, colNames))
[docs]def dumpStatsForRD(rd, conn): """writes metadata for rd and its tables """ print("="*72) rdId = rd.sourceId print(f"Statistics for RD {rdId}") try: print("spatial converage {}".format(next(conn.query( "SELECT spatial FROM dc.rdmeta WHERE sourcerd=%(rdId)s", locals()))[0] or "<Unknown>")) except StopIteration: print("No RD stats.") else: print("temporal converage {}".format(next(conn.query( "SELECT temporal FROM dc.rdmeta WHERE sourcerd=%(rdId)s", locals()))[0] or "<Unknown>")) print("spectral converage {}".format(next(conn.query( "SELECT spectral FROM dc.rdmeta WHERE sourcerd=%(rdId)s", locals()))[0] or "<Unknown>")) for td in rd.tables: dumpTableLevelStats(td, conn)
[docs]def parseCmdLine(): from argparse import ArgumentParser parser = ArgumentParser( description="Updates existing values min/max items in a referenced" " table or RD.") parser.add_argument("-t", "--tables-only", dest="tablesOnly", action="store_true", help="Only acquire table/resource-level metadata (rather than column" " metadata, which usually takes a lot longer).") parser.add_argument("-s", "--sample-percent", type=float, default=None, dest="samplePercent", metavar="P", help="Only look at P percent of the table to determine min/max/mean.") parser.add_argument("-d", "--dump", action="store_true", default=False, dest="dumpOnly", help="Do not obtain statistics but rather dump the results of the" " last run") parser.add_argument("itemId", nargs="+", help="Cross-RD reference of a table or" " RD to update, as in ds/q or ds/q#mytable; only RDs in inputsDir" " can be updated. A single ALL will expand to all RDs that already" " have limits-obtained metadata.") return parser.parse_args()
[docs]def main(): from gavo import api args = parseCmdLine() if len(args.itemId)==1 and args.itemId[0]=="ALL": args.itemId = _getUpdatableRdIds() with api.getWritableAdminConn() as conn: for itemRef in args.itemId: item = api.getReferencedElement(itemRef) if isinstance(item, api.TableDef): if args.dumpOnly: dumpTableLevelStats(item, conn) else: updateTableLevelStats(item, conn, args.samplePercent, not args.tablesOnly) elif isinstance(item, api.RD): if args.dumpOnly: dumpStatsForRD(item, conn) else: updateForRD(item, conn, args.samplePercent, not args.tablesOnly) else: raise base.ReportableError( "%s references neither an RD nor a table definition"%args.itemId) conn.commit()