Package gavo :: Package user :: Module importing
[frames] | no frames]

Source Code for Module gavo.user.importing

  1  """ 
  2  The user interface to importing resources into the VO. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import sys 
 12  from optparse import OptionParser 
 13   
 14  from gavo import api 
 15  from gavo import base 
 16  from gavo.protocols import tap 
 17  from gavo.rscdef import scripting 
 18  from gavo.user import common 
19 20 21 -class RetvalWatcher(base.ObserverBase):
22 """an Observer giving approproate program return values. 23 24 Basically, we want to return an error signature even if we managed 25 to import things if at least once there was an error notification. 26 27 We define this "error occurred but we manage" code to 101 here. I'm 28 sure we can do better than that. 29 """ 30 retval = 0 31 32 @base.listensTo("Error")
33 - def fixRetVal(self, msg):
34 self.retval = 101
35
36 37 -class TableCollector(base.ObserverBase):
38 """collects the qualified names of tables changed. 39 40 This is used to run vacuum analyze on the respective tables before 41 the import exits; the the vacuumAll method of this class can do 42 that once all importing connections are closed (even if an Observer 43 shouldn't do a thing like that...) 44 """
45 - def __init__(self, eh):
46 base.ObserverBase.__init__(self, eh) 47 self.tablesChanged = []
48 49 @base.listensTo("DBTableModified")
50 - def addChangedTable(self, fqName):
51 self.tablesChanged.append(fqName)
52
53 - def vacuumAll(self):
54 from gavo import adql 55 tableNameSym = adql.getSymbols()["qualifier"] 56 57 # do not use a connection from the pool here; they might be debug 58 # connections with might implicitly start transactions, which 59 # in turn will break vacuuming. 60 conn = base.getDBConnection("feed", autocommitted=True) 61 for tableName in self.tablesChanged: 62 try: 63 tableNameSym.parseString(tableName, parseAll=True) 64 except base.ParseException: 65 # forget about odd table names for now. 66 pass 67 conn.execute("VACUUM ANALYZE %s"%tableName) 68 conn.close()
69
70 71 -def process(opts, args):
72 """imports the data set described by args governed by opts. 73 74 The first item of args is an RD id, any remaining ones are interpreted 75 as DD ids within the selected RD. If no DD ids are given, all DDs within 76 the RD are processed except those for which auto has been set to False. 77 78 opts is either a ParseOption instance or the object returned by 79 main's parseOption function below. 80 """ 81 # process manages its dependencies itself 82 retvalWatcher = RetvalWatcher(api.ui) 83 opts.buildDependencies = False 84 85 # collect tables due for vacuuming 86 tableCollector = TableCollector(api.ui) 87 88 rdId, selectedIds = args[0], args[1:] 89 rd = api.getReferencedElement(rdId, forceType=api.RD) 90 91 dds = common.getPertainingDDs(rd, selectedIds) 92 connection = api.getDBConnection("admin") 93 tap.unpublishFromTAP(rd, connection) 94 tap.publishToTAP(rd, connection) 95 96 for dd in dds: 97 if opts.metaOnly: 98 api.ui.notifyInfo("Updating meta for %s"%dd.id) 99 res = api.Data.create(dd, parseOptions=opts, connection=connection 100 ).updateMeta(opts.metaPlusIndex) 101 102 # Hack: if there's an obscore mixin active, redo the obscore 103 # Is there a less special-cased way to do this? 104 for make in dd.makes: 105 for script in make.scripts: 106 if script.id=='addTableToObscoreSources': 107 scripting.PythonScriptRunner(script).run( 108 res.tables[make.table.id]) 109 else: 110 api.ui.notifyInfo("Making data %s"%dd.getFullId()) 111 res = api.makeData(dd, parseOptions=opts, connection=connection) 112 if hasattr(res, "nAffected"): 113 api.ui.notifyInfo("Rows affected: %s"%res.nAffected) 114 # We're committing here so that we don't lose all importing 115 # work just because some dependent messes up. 116 connection.commit() 117 118 api.makeDependentsFor(dds, opts, connection) 119 connection.commit() 120 rd.touchTimestamp() 121 base.tryRemoteReload("__system__/dc_tables") 122 123 tableCollector.vacuumAll() 124 125 return retvalWatcher.retval
126
127 128 -def main():
129 """parses the command line and imports a set of data accordingly. 130 """ 131 def parseCmdline(): 132 parser = OptionParser(usage="%prog [options] <rd-name> {<data-id>}", 133 description="imports all (or just the selected) data from an RD" 134 " into the database.") 135 parser.add_option("-n", "--updateRows", help="Use UPDATE on primary" 136 " key rather than INSERT with rows inserted to DBTables.", 137 action="store_true", dest="doTableUpdates", default=False) 138 parser.add_option("-d", "--dumpRows", help="Dump raw rows as they are" 139 " emitted by the grammar.", dest="dumpRows", action="store_true", 140 default=False) 141 parser.add_option("-D", "--dumpIngestees", help="Dump processed" 142 " rows as emitted by the row makers.", dest="dumpIngestees", 143 action="store_true", default=False) 144 parser.add_option("-R", "--redoIndex", help="Drop indices before" 145 " updating a table and recreate them when done", dest="dropIndices", 146 action="store_true", default=False) 147 parser.add_option("-m", "--meta-only", help="just update table meta" 148 " (privileges, column descriptions,...).", dest="metaOnly", 149 action="store_true") 150 parser.add_option("-I", "--meta-and-index", help="do not import, but" 151 " update table meta (privileges, column descriptions,...) and recreate" 152 " the indices. NOTE: This will at the moment *not* re-create" 153 " primary keys because that breaks when there are foreign keys on" 154 " the table. We do not see a good solution of this at the moment.", 155 dest="metaPlusIndex", action="store_true") 156 parser.add_option("-s", "--system", help="(re-)create system tables, too", 157 dest="systemImport", action="store_true") 158 parser.add_option("-v", "--verbose", help="talk a lot while working", 159 dest="verbose", action="store_true") 160 parser.add_option("-r", "--reckless", help="Do not validate rows" 161 " before ingestion", dest="validateRows", action="store_false", 162 default=True) 163 parser.add_option("-M", "--stop-after", help="Stop after having parsed" 164 " MAX rows", metavar="MAX", action="store", dest="maxRows", type="int", 165 default=None) 166 parser.add_option("-b", "--batch-size", help="deliver N rows at a time" 167 " to the database.", dest="batchSize", action="store", type="int", 168 default=5000, metavar="N") 169 parser.add_option("-c", "--continue-bad", help="do not bail out after" 170 " an error, just skip the current source and continue with the" 171 " next one.", dest="keepGoing", action="store_true", default=False) 172 parser.add_option("-L", "--commit-after-meta", help="commit the importing" 173 " transaction after updating the meta tables. Use this when loading" 174 " large (hence -L) data sets to avoid keeping a lock on the meta tables" 175 " for the duration of the input, i.e., potentially days. The price" 176 " is that users will see empty tables during the import.", 177 dest="commitAfterMeta", action="store_true", default=False) 178 179 (opts, args) = parser.parse_args() 180 181 if opts.metaPlusIndex: 182 opts.metaOnly = True 183 if not args: 184 parser.print_help(file=sys.stderr) 185 sys.exit(1) 186 return opts, args
187 188 189 opts, args = parseCmdline() 190 sys.exit(process(opts, args)) 191 192 193 if __name__=="__main__": 194 main() 195