Package gavo :: Package rsc :: Module data
[frames] | no frames]

Source Code for Module gavo.rsc.data

  1  """ 
  2  Making data out of descriptors and sources. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  from __future__ import print_function 
 12   
 13  import itertools 
 14  import operator 
 15  import sys 
 16   
 17  from gavo import base 
 18  from gavo import rscdef 
 19  from gavo import utils 
 20  from gavo.rsc import common 
 21  from gavo.rsc import table 
 22  from gavo.rsc import tables 
 23   
 24   
 25  MS = base.makeStruct 
26 27 28 -class _DataFeeder(table._Feeder):
29 """is a feeder for data (i.e., table collections). 30 31 This is basically a collection of all feeders of the tables belonging 32 to data, except it will also call the table's mappers, i.e., add 33 expects source rows from data's grammars. 34 35 Feeders can be dispatched; this only works if the grammar returns 36 pairs of role and row rather than only the row. Dispatched 37 feeders only pass rows to the makes corresponding to the role. 38 39 If you pass in a connection, the data feeder will manage it (i.e. 40 commit if all went well, rollback otherwise). 41 """
42 - def __init__(self, data, batchSize=1024, dispatched=False, 43 runCommit=True, connection=None, dumpIngestees=False):
44 self.data, self.batchSize = data, batchSize 45 self.runCommit = runCommit 46 self.nAffected = 0 47 self.connection = connection 48 self.dumpIngestees = dumpIngestees 49 if dispatched: 50 makeAdders = self._makeFeedsDispatched 51 else: 52 makeAdders = self._makeFeedsNonDispatched 53 54 addersDict, parAddersDict, self.feeders = self._getAdders() 55 self.add, self.addParameters = makeAdders(addersDict, parAddersDict)
56
57 - def _getAdders(self):
58 """returns a triple of (rowAdders, parAdders, feeds) for the data we 59 feed to. 60 61 rowAdders contains functions to add raw rows returned from a grammar, 62 parAdders the same for parameters returned by the grammar, and 63 feeds is a list containing all feeds the adders add to (this 64 is necessary to let us exit all of them. 65 """ 66 adders, parAdders, feeders = {}, {}, [] 67 for make in self.data.dd.makes: 68 table = self.data.tables[make.table.id] 69 feeder = table.getFeeder(batchSize=self.batchSize) 70 makeRow = make.rowmaker.compileForTableDef(table.tableDef) 71 72 def addRow(srcRow, feeder=feeder, makeRow=makeRow, table=table): 73 try: 74 procRow = makeRow(srcRow, table) 75 if self.dumpIngestees: 76 print("PROCESSED ROW:", procRow) 77 feeder.add(procRow) 78 except rscdef.IgnoreThisRow: 79 pass
80 81 if make.rowSource=="parameters": 82 parAdders.setdefault(make.role, []).append(addRow) 83 else: 84 adders.setdefault(make.role, []).append(addRow) 85 86 if make.parmaker: 87 parAdders.setdefault(make.role, []).append( 88 lambda row, m=make, t=table: m.runParmakerFor(row, t)) 89 90 feeders.append(feeder) 91 return adders, parAdders, feeders
92
93 - def _makeFeedsNonDispatched(self, addersDict, parAddersDict):
94 adders = reduce(operator.add, addersDict.values(), []) 95 parAdders = reduce(operator.add, parAddersDict.values(), []) 96 def add(row): 97 for adder in adders: 98 adder(row)
99 def addParameters(row): 100 for adder in parAdders: 101 adder(row) 102 return add, addParameters 103
104 - def _makeFeedsDispatched(self, addersDict, parAddersDict):
105 def add(roleRow): 106 role, row = roleRow 107 if role not in addersDict: 108 raise base.ReportableError("Grammar tries to feed to role '%s'," 109 " but there is no corresponding make"%role) 110 for adder in addersDict[role]: 111 adder(row)
112 113 # for parameters, allow broadcast 114 def addParameters(roleRow): 115 try: 116 role, row = roleRow 117 except ValueError: 118 # assume we only got a row, broadcast it 119 for adder in itertools.chain(*parAddersDict.values()): 120 adder(roleRow) 121 else: 122 for adder in parAddersDict[role]: 123 adder(row) 124 125 return add, addParameters 126
127 - def flush(self):
128 for feeder in self.feeders: 129 feeder.flush()
130
131 - def reset(self):
132 for feeder in self.feeders: 133 feeder.reset()
134
135 - def __enter__(self):
136 for feeder in self.feeders: 137 feeder.__enter__() 138 return self
139
140 - def _breakCycles(self):
141 del self.feeders 142 del self.add 143 del self.addParameters
144
145 - def _exitFailing(self, *excInfo):
146 """calls all subordinate exit methods when there was an error in 147 the controlled block. 148 149 This ignores any additional exceptions that might come out of 150 the exit methods. 151 152 The connection is rolled back, and we unconditionally propagate 153 the exception. 154 """ 155 for feeder in self.feeders: 156 try: 157 feeder.__exit__(*excInfo) 158 except: 159 base.ui.notifyError("Ignored exception while exiting data feeder" 160 " on error.") 161 if self.connection and self.runCommit: 162 self.connection.rollback() 163 self._breakCycles()
164
165 - def _exitSuccess(self):
166 """calls all subordinate exit methods when the controlled block 167 exited successfully. 168 169 If one of the exit methods fails, we run _exitFailing and re-raise 170 the exception. 171 172 If all went well and we control a connection, we commit it (unless 173 clients explicitely forbid it). 174 """ 175 affected = [] 176 for feeder in self.feeders: 177 try: 178 feeder.__exit__(None, None, None) 179 except: 180 self._exitFailing(*sys.exc_info()) 181 raise 182 affected.append(feeder.getAffected()) 183 184 if self.connection and self.runCommit: 185 self.connection.commit() 186 187 self._breakCycles() 188 if affected: 189 self.nAffected = max(affected)
190
191 - def __exit__(self, *excInfo):
192 if excInfo and excInfo[0]: 193 return self._exitFailing(*excInfo) 194 else: 195 self._exitSuccess()
196
197 - def getAffected(self):
198 return self.nAffected
199
200 201 -class Data(base.MetaMixin, common.ParamMixin):
202 """A collection of tables. 203 204 ``Data``, in essence, is the instanciation of a ``DataDescriptor``. 205 206 It is what ``makeData`` returns. In typical one-table situations, 207 you just want to call the ``getPrimaryTable()`` method to obtain the 208 table built. 209 """
210 - def __init__(self, dd, tables, parseOptions=common.parseNonValidating):
211 base.MetaMixin.__init__(self) # we're not a structure 212 self.dd, self.parseOptions = dd, parseOptions 213 self.tables = tables 214 self.setMetaParent(self.dd) 215 self._initParams(self.dd)
216
217 - def __iter__(self):
218 for make in self.dd.makes: 219 yield self.tables[make.table.id]
220 221 @classmethod
222 - def create(cls, dd, parseOptions=common.parseNonValidating, 223 connection=None):
224 """returns a new data instance for dd. 225 226 Existing tables on the database are not touched. To actually 227 re-create them, call recrateTables. 228 """ 229 controlledTables = {} 230 res = cls(dd, controlledTables, parseOptions) 231 for make in dd.makes: 232 controlledTables[make.table.id 233 ] = make.create(connection, parseOptions, tables.TableForDef, 234 parent=res) 235 return res
236 237 @classmethod
238 - def drop(cls, dd, parseOptions=common.parseNonValidating, connection=None):
239 """drops all tables made by dd if necessary. 240 """ 241 controlledTables = {} 242 for make in dd.makes: 243 controlledTables[make.table.id 244 ] = tables.TableForDef(make.table, create=False, connection=connection) 245 # The next line is necessary to have the table's beforeDrop scripts 246 # exectued -- this is all far too ugly to be the right way. I guess 247 # beforeDrop really is a property of the table rather than of the 248 # make, and makes and tables should have different runners... 249 controlledTables[make.table.id]._runScripts = make.getRunner() 250 data = cls(dd, controlledTables, parseOptions) 251 data.dropTables(parseOptions) 252 return data
253
254 - def validateParams(self):
255 """raises a ValidationError if any required parameters within 256 this data's tables are still None. 257 """ 258 for t in self: 259 t.validateParams()
260
261 - def dropTables(self, parseOptions):
262 for t in self: 263 if t.tableDef.onDisk: 264 if not parseOptions.systemImport and t.tableDef.system: 265 continue 266 t.drop()
267
268 - def updateMeta(self, updateIndices=False):
269 """updates meta information kept in the DB on the contained tables. 270 """ 271 for t in self: 272 if hasattr(t, "updateMeta"): 273 t.updateMeta() 274 if updateIndices: 275 t.dropIndices() 276 t.makeIndices() 277 return self
278
279 - def recreateTables(self, connection):
280 """drops and recreates all table that are onDisk. 281 282 System tables are only recreated when the systemImport parseOption 283 is true. 284 """ 285 if self.dd.updating: 286 if self.parseOptions.dropIndices: 287 for t in self: 288 if t.tableDef.onDisk: 289 t.dropIndices() 290 return 291 292 for t in self: 293 if t.tableDef.system and not self.parseOptions.systemImport: 294 continue 295 if t.tableDef.onDisk: 296 t.runScripts("preImport") 297 t.recreate()
298
299 - def getPrimaryTable(self):
300 """returns the table contained if there is only one, or the one 301 with the role primary. 302 303 If no matching table can be found, raise a DataError. 304 """ 305 try: 306 return self.tables[self.dd.getPrimary().id] 307 except (KeyError, base.StructureError): 308 raise base.DataError( 309 "No primary table in this data")
310
311 - def getTableWithRole(self, role):
312 try: 313 return self.tables[self.dd.getTableDefWithRole(role).id] 314 except (KeyError, base.StructureError): 315 raise base.DataError( 316 "No table with role %s known here"%repr(role))
317
318 - def getFeeder(self, **kwargs):
319 return _DataFeeder(self, **kwargs)
320
321 - def runScripts(self, phase, **kwargs):
322 for make in self.dd.makes: 323 make.getRunner()(self.tables[make.table.id], phase, **kwargs)
324
325 326 -class _EnoughRows(base.ExecutiveAction):
327 """is an internal exception that allows processSource to tell makeData 328 to stop handling more sources. 329 """
330
331 332 -def _pipeRows(srcIter, feeder, opts):
333 pars = srcIter.getParameters() 334 if opts.dumpIngestees: 335 print("PROCESSED PARAMS:", pars) 336 feeder.addParameters(pars) 337 338 for srcRow in srcIter: 339 340 if srcRow is common.FLUSH: 341 feeder.flush() 342 continue 343 344 if srcIter.notify: 345 base.ui.notifyIncomingRow(srcRow) 346 if opts.dumpRows: 347 print(srcRow) 348 349 feeder.add(srcRow) 350 if opts.maxRows: 351 if base.ui.totalRead>=opts.maxRows: 352 raise _EnoughRows
353
354 355 -def _processSourceReal(data, source, feeder, opts):
356 """helps processSource. 357 """ 358 if data.dd.grammar is None: 359 raise base.ReportableError("The data descriptor %s cannot be used" 360 " to make data since it has no defined grammar."%data.dd.id) 361 data.runScripts("newSource", sourceToken=source) 362 srcIter = data.dd.grammar.parse(source, data) 363 if hasattr(srcIter, "getParameters"): # is a "normal" grammar 364 try: 365 _pipeRows(srcIter, feeder, opts) 366 except (base.Error,base.ExecutiveAction): 367 raise 368 except Exception as msg: 369 raise base.ui.logOldExc( 370 base.SourceParseError(repr(msg), 371 source=utils.makeLeftEllipsis(repr(source), 80), 372 location=srcIter.getLocator())) 373 else: # magic grammars (like those of boosters) return a callable 374 srcIter(data) 375 data.runScripts("sourceDone", sourceToken=source, feeder=feeder)
376
377 378 -def processSource(data, source, feeder, opts, connection=None):
379 """ingests source into the Data instance data. 380 381 If this builds database tables, you must pass in a connection object. 382 383 If opts.keepGoing is True,the system will continue importing 384 even if a particular source has caused an error. In that case, 385 everything contributed by the bad source is rolled back (this will 386 only work when filling database tables). 387 """ 388 if not opts.keepGoing: 389 # simple shortcut if we don't want to recover from bad sources 390 _processSourceReal(data, source, feeder, opts) 391 392 else: # recover from bad sources, be more careful 393 if connection is None: 394 raise base.ReportableError("Can only ignore source errors" 395 " when filling database tables.", 396 hint="The -c flag on dachs imp and its friends builds on database" 397 " savepoints. You can thus only meaninfully use it when your" 398 " table has onDisk='True'.") 399 try: 400 with connection.savepoint(): 401 _processSourceReal(data, source, feeder, opts) 402 feeder.flush() 403 except Exception as ex: 404 feeder.reset() 405 if not isinstance(ex, base.ExecutiveAction): 406 base.ui.notifyError("Error while importing source %s; changes from" 407 " this source will be rolled back, processing will continue." 408 " (%s)"%( 409 utils.makeSourceEllipsis(source), 410 utils.safe_str(ex)))
411
412 413 -class _TableCornucopeia(object):
414 """a scaffolding class instances of which return something (eventually 415 table-like) for all keys it is asked for. 416 """
417 - def __getitem__(self, key):
418 return None
419
420 421 -def makeData(dd, parseOptions=common.parseNonValidating, 422 forceSource=None, connection=None, data=None, runCommit=True):
423 """returns a data instance built from ``dd``. 424 425 It will arrange for the parsing of all tables generated from dd's grammar. 426 427 If database tables are being made, you *must* pass in a connection. 428 The entire operation will then run within a single transaction within 429 this connection (except for building dependents; they will be built 430 in separate transactions). 431 432 The connection will be rolled back or committed depending on the 433 success of the operation (unless you pass ``runCommit=False``, in 434 which case even a successful import will not be committed).. 435 436 You can pass in a data instance created by yourself in data. This 437 makes sense if you want to, e.g., add some meta information up front. 438 """ 439 # Some proc setup does expensive things like actually building data. 440 # We don't want that when validating and return some empty data thing. 441 if getattr(base, "VALIDATING", False): 442 return Data(dd, _TableCornucopeia()) 443 444 if data is None: 445 res = Data.create(dd, parseOptions, connection=connection) 446 else: 447 res = data 448 res.recreateTables(connection) 449 450 feederOpts = {"batchSize": parseOptions.batchSize, "runCommit": runCommit, 451 "dumpIngestees": parseOptions.dumpIngestees} 452 if dd.grammar and dd.grammar.isDispatching: 453 feederOpts["dispatched"] = True 454 455 with res.getFeeder(connection=connection, **feederOpts) as feeder: 456 if forceSource is None: 457 sources = dd.iterSources(connection) 458 else: 459 sources = [forceSource] 460 461 for source in sources: 462 try: 463 processSource(res, source, feeder, parseOptions, connection) 464 except _EnoughRows: 465 base.ui.notifyWarning("Source hit import limit, import aborted.") 466 break 467 except base.SkipThis: 468 continue 469 470 res.validateParams() 471 res.nAffected = feeder.getAffected() 472 473 if parseOptions.buildDependencies: 474 makeDependentsFor([dd], parseOptions, connection) 475 476 return res
477
478 479 480 -class DDDependencyGraph(object):
481 """a graph giving the dependency structure between DDs. 482 483 This is constructed with a list of DDs. 484 485 From it, you can get a build sequence (least-depending thing build first) or 486 a destroy sequence (most-depending things built first). 487 488 If you pass spanRDs=True, only DDs residing within the first DD's RD are 489 considered. 490 """
491 - def __init__(self, dds, spanRDs=True):
492 self.limitToRD = None 493 if not spanRDs and dds: 494 self.limitToRD = dds[0].rd 495 496 self._edges, self._seen = set(), set() 497 self._gather(dds)
498
499 - def _gatherOne(self, dd):
500 for dependentId in dd.dependents: 501 try: 502 dependentDD = base.resolveId(dd.rd, dependentId) 503 if self.limitToRD and self.limitToRD!=dependentDD.rd: 504 continue 505 506 self._edges.add((dd, dependentDD)) 507 if dependentDD not in self._seen: 508 self._seen.add(dependentDD) 509 self._gatherOne(dependentDD) 510 except (base.StructureError, base.NotFoundError) as msg: 511 base.ui.notifyWarning("Ignoring dependent %s of %s (%s)"%( 512 dependentId, dd.getFullId(), unicode(msg)))
513
514 - def _gather(self, dds):
515 for dd in dds: 516 self._gatherOne(dd)
517
518 - def getBuildSequence(self):
519 return utils.topoSort(self._edges)
520
521 - def getDestroySequence(self):
522 inverted = [(b,a) for a, b in self._edges] 523 return utils.topoSort(inverted)
524
525 526 -def makeDependentsFor(dds, parseOptions, connection):
527 """rebuilds all data dependent on one of the DDs in the dds sequence. 528 """ 529 if parseOptions.buildDependencies: 530 parseOptions = parseOptions.change(buildDependencies=False) 531 532 try: 533 buildSequence = DDDependencyGraph(dds).getBuildSequence() 534 except ValueError as ex: 535 raise utils.logOldExc(base.ReportableError("Could not sort" 536 " dependent DDs topologically (use --hints to learn more).", 537 hint="This is most likely because there's a cyclic dependency." 538 " Please check your dependency structure. The original message" 539 " is: %s"%utils.safe_str(ex))) 540 541 # remove DDs passed in from the build sequence, as long as nothing 542 # is built in between (which might necessitate a re-build of something 543 # we already did) 544 for dd in buildSequence[:]: 545 if dd in dds: 546 buildSequence.pop(0) 547 else: 548 break 549 550 if parseOptions.metaOnly: 551 if buildSequence: 552 base.ui.notifyWarning("Only importing metadata, not rebuilding" 553 " dependencies. Depending on your changes, it may be" 554 " necessary to manually re-make one of these: %s"% 555 ", ".join(dd.getFullId() for dd in buildSequence)) 556 else: 557 for dd in buildSequence: 558 base.ui.notifyInfo("Making dependent %s"%dd.getFullId()) 559 makeData(dd, parseOptions=parseOptions, connection=connection)
560
561 562 -def makeDataById(ddId, parseOptions=common.parseNonValidating, 563 connection=None, inRD=None):
564 """returns the data set built from the DD with ddId (which must be 565 fully qualified). 566 """ 567 dd = base.resolveId(inRD, ddId) 568 return makeData(dd, parseOptions=parseOptions, connection=connection)
569
570 571 -def wrapTable(table, rdSource=None):
572 """returns a Data instance containing only table (or table if it's already 573 a data instance). 574 575 If table has no rd, you must pass rdSource, which must be an object having 576 and rd attribute (rds, tabledefs, etc, work). 577 578 This will grab info meta from the table. 579 """ 580 if hasattr(table, "dd"): 581 # we trust it's already a Data instance (don't want to use isinstance 582 # here since people may pass in fakes). 583 return table 584 585 if rdSource is None: 586 rd = table.tableDef.rd 587 elif hasattr(rdSource, "rd"): 588 rd = rdSource.rd 589 else: 590 raise TypeError("Invalid RD source: %s"%rdSource) 591 newDD = MS(rscdef.DataDescriptor, makes=[ 592 MS(rscdef.Make, table=table.tableDef, rowmaker=None)], parent_=rd) 593 if rdSource: 594 newDD.adopt(table.tableDef) 595 596 newDD.setMetaParent(table.tableDef.rd) 597 598 res = Data(newDD, tables={table.tableDef.id: table}) 599 600 for infoMeta in table.iterMeta("info"): 601 res.addMeta("info", infoMeta) 602 for mi in table.iterMeta("_votableRootAttributes", propagate=False): 603 res.addMeta("_votableRootAttributes", mi) 604 605 return res
606
607 608 -def makeCombinedData(baseDD, tablesForRoles):
609 """returns a Data instance containing all of tablesForRoles. 610 611 A DD is being generated based on baseDD; if baseDD has any tables, they are 612 discarded. 613 614 tablesForRoles is a mapping from strings (one of which should be "primary") 615 to tables; the strings end up as roles. 616 """ 617 newDD = baseDD.change( 618 makes=[MS(rscdef.Make, table=t.tableDef, rowmaker=None, role=role) 619 for role, t in tablesForRoles.iteritems()]) 620 newDD.meta_ = baseDD._metaAttr.getCopy(baseDD, newDD, None) 621 return Data(newDD, tables=dict((t.tableDef.id, t) 622 for t in tablesForRoles.values()))
623