Source code for gavo.user.importing

"""
The user interface to importing resources into the VO.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import argparse
import sys

from gavo import api
from gavo import base
from gavo.protocols import obscore
from gavo.protocols import tap
from gavo.user import common


[docs]class RetvalWatcher(base.ObserverBase): """an Observer giving appropriate program return values. Basically, we want to return an error signature even if we managed to import things if at least once there was an error notification. We define this "error occurred but we manage" code to 101 here. I'm sure we can do better than that. """ retval = 0
[docs] @base.listensTo("Error") def fixRetVal(self, msg): self.retval = 101
[docs]class TableCollector(base.ObserverBase): """collects the qualified names of tables changed. This is used to run vacuum analyze on the respective tables before the import exits; the the vacuumAll method of this class can do that once all importing connections are closed (even if an Observer shouldn't do a thing like that...) """ def __init__(self, eh): base.ObserverBase.__init__(self, eh) self.tablesChanged = []
[docs] @base.listensTo("DBTableModified") def addChangedTable(self, fqName): self.tablesChanged.append(fqName)
[docs] def vacuumAll(self): from gavo import adql tableNameSym = adql.getSymbols()["qualifier"] # do not use a connection from the pool here; they might be debug # connections with might implicitly start transactions, which # in turn will break vacuuming. conn = base.getDBConnection("feed", autocommitted=True) for tableName in self.tablesChanged: try: tableNameSym.parseString(tableName, parseAll=True) except base.ParseException: # forget about odd table names for now. pass conn.execute("VACUUM ANALYZE %s"%tableName) conn.close()
[docs]def process(opts, args): """imports the data set described by args governed by opts. The first item of args is an RD id, any remaining ones are interpreted as DD ids within the selected RD. If no DD ids are given, all DDs within the RD are processed except those for which auto has been set to False. opts is either a ParseOption instance or the object returned by main's parseOption function below. """ # process manages its dependencies itself retvalWatcher = RetvalWatcher(api.ui) opts.buildDependencies = False # collect tables due for vacuuming tableCollector = TableCollector(api.ui) rdId, selectedIds = args[0], args[1:] rd = api.getReferencedElement(rdId, forceType=api.RD) dds = common.getPertainingDDs(rd, selectedIds) connection = api.getDBConnection("admin") if not opts.suppressMeta: tap.unpublishFromTAP(rd, connection) tap.publishToTAP(rd, connection) dbCatalogChanged = False for dd in dds: if opts.metaOnly: api.ui.notifyInfo("Updating meta for %s"%dd.id) res = api.Data.create(dd, parseOptions=opts, connection=connection) res.updateMeta() else: api.ui.notifyInfo("Making data %s"%dd.getFullId()) res = api.makeData(dd, parseOptions=opts, connection=connection) dbCatalogChanged = dbCatalogChanged or res.dbCatalogChanged() if hasattr(res, "nAffected"): api.ui.notifyInfo("Rows affected: %s"%res.nAffected) # Let's update the date updated independently of dachs pub; not many # people bother to re-publish the RD just because they did a re-import. if base.UnmanagedQuerier(connection).getTableType("dc.resources"): # skip this during bootstrap connection.execute("UPDATE dc.resources SET" " dateupdated=CURRENT_TIMESTAMP," " rectimestamp=CURRENT_TIMESTAMP" " WHERE sourcerd=%(rdid)s", {"rdid": rd.sourceId}) # We're committing here so that we don't lose all importing # work just because some dependent messes up. connection.commit() if not opts.suppressMeta: rd.updateMetaInDB(connection, "import") connection.commit() # re-indexing is a big deal because it may involve primary key # operations and, in particular, clustering, which locks the # table. Let's take everything out of obscore before doing that. if opts.metaPlusIndex: with obscore.suspendFromObscore(rd, connection): for dd in dds: data = api.Data.create(dd, parseOptions=opts, connection=connection) for t in data: t.dropIndices() t.makeIndices() api.makeDependentsFor(dds, opts, connection, dbCatalogChanged) connection.commit() if not opts.suppressMeta: base.tryRemoteReload("__system__/dc_tables") tableCollector.vacuumAll() return retvalWatcher.retval
[docs]class HideADQLAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): from gavo.rscdef import tabledef tabledef.ADQLVisibilityAttribute.trueValue = "hidden"
[docs]def main(): """parses the command line and imports a set of data accordingly. """ def parseCmdline(): parser = argparse.ArgumentParser(description="imports all (or just the" " selected) data items from an RD into the database.") parser.add_argument("-n", "--updateRows", help="Use UPDATE on primary" " key rather than INSERT with rows inserted to DBTables.", action="store_true", dest="doTableUpdates", default=False) parser.add_argument("-d", "--dumpRows", help="Dump raw rows as they are" " emitted by the grammar.", dest="dumpRows", action="store_true", default=False) parser.add_argument("-D", "--dumpIngestees", help="Dump processed" " rows as emitted by the row makers.", dest="dumpIngestees", action="store_true", default=False) parser.add_argument("-R", "--redoIndex", help="Drop indices before" " updating a table and recreate them when done", dest="dropIndices", action="store_true", default=False) parser.add_argument("-m", "--meta-only", help="just update table meta" " (privileges, column descriptions,...).", dest="metaOnly", action="store_true") parser.add_argument("--suppress-meta", help="do not update any metadata" " (this is likely only useful for updating DDs that already exist).", dest="suppressMeta", action="store_true") parser.add_argument("-I", "--meta-and-index", help="do not import, but" " update table meta (privileges, column descriptions,...) and recreate" " the indices. NOTE: This will at the moment *not* re-create" " primary keys because that breaks when there are foreign keys on" " the table. We do not see a good solution of this at the moment.", dest="metaPlusIndex", action="store_true") parser.add_argument("-s", "--system", help="(re-)create system tables, too", dest="systemImport", action="store_true") parser.add_argument("-v", "--verbose", help="talk a lot while working", dest="verbose", action="store_true") parser.add_argument("-r", "--reckless", help="Do not validate rows" " before ingestion", dest="validateRows", action="store_false", default=True) parser.add_argument("-M", "--stop-after", help="Stop after having parsed" " MAX rows", action="store", dest="maxRows", type=int, default=None) parser.add_argument("-b", "--batch-size", help="deliver N rows at a time" " to the database.", dest="batchSize", action="store", type=int, default=5000) parser.add_argument("-c", "--continue-bad", help="do not bail out after" " an error, just skip the current source and continue with the" " next one.", dest="keepGoing", action="store_true", default=False) parser.add_argument("-L", "--commit-after-meta", help="commit the importing" " transaction after updating the meta tables. Use this when loading" " large (hence -L) data sets to avoid keeping a lock on the meta tables" " for the duration of the input, i.e., potentially days. The price" " is that users will see empty tables during the import.", dest="commitAfterMeta", action="store_true", default=False) parser.add_argument("--hide-adql", help="Interpret adql=True on tables" " as adql=hidden (this is for fallback mirrors)", nargs=0, metavar=None, action=HideADQLAction) parser.add_argument("rd", type=str, help="An RD id or a file system" " path of an RD") parser.add_argument("dds", type=str, nargs="*", help="Optional DD ids to restrict the import to these. Non-auto" " DDs actually must be mentioned here.") args = parser.parse_args() if args.metaPlusIndex: args.metaOnly = True return args args = parseCmdline() sys.exit(process(args, [args.rd]+args.dds))