1 """
2 Making data out of descriptors and sources.
3 """
4
5
6
7
8
9
10
11 from __future__ import print_function
12
13 import itertools
14 import operator
15 import sys
16
17 from gavo import base
18 from gavo import rscdef
19 from gavo import utils
20 from gavo.rsc import common
21 from gavo.rsc import table
22 from gavo.rsc import tables
23
24
25 MS = base.makeStruct
29 """is a feeder for data (i.e., table collections).
30
31 This is basically a collection of all feeders of the tables belonging
32 to data, except it will also call the table's mappers, i.e., add
33 expects source rows from data's grammars.
34
35 Feeders can be dispatched; this only works if the grammar returns
36 pairs of role and row rather than only the row. Dispatched
37 feeders only pass rows to the makes corresponding to the role.
38
39 If you pass in a connection, the data feeder will manage it (i.e.
40 commit if all went well, rollback otherwise).
41 """
42 - def __init__(self, data, batchSize=1024, dispatched=False,
43 runCommit=True, connection=None, dumpIngestees=False):
44 self.data, self.batchSize = data, batchSize
45 self.runCommit = runCommit
46 self.nAffected = 0
47 self.connection = connection
48 self.dumpIngestees = dumpIngestees
49 if dispatched:
50 makeAdders = self._makeFeedsDispatched
51 else:
52 makeAdders = self._makeFeedsNonDispatched
53
54 addersDict, parAddersDict, self.feeders = self._getAdders()
55 self.add, self.addParameters = makeAdders(addersDict, parAddersDict)
56
58 """returns a triple of (rowAdders, parAdders, feeds) for the data we
59 feed to.
60
61 rowAdders contains functions to add raw rows returned from a grammar,
62 parAdders the same for parameters returned by the grammar, and
63 feeds is a list containing all feeds the adders add to (this
64 is necessary to let us exit all of them.
65 """
66 adders, parAdders, feeders = {}, {}, []
67 for make in self.data.dd.makes:
68 table = self.data.tables[make.table.id]
69 feeder = table.getFeeder(batchSize=self.batchSize)
70 makeRow = make.rowmaker.compileForTableDef(table.tableDef)
71
72 def addRow(srcRow, feeder=feeder, makeRow=makeRow, table=table):
73 try:
74 procRow = makeRow(srcRow, table)
75 if self.dumpIngestees:
76 print("PROCESSED ROW:", procRow)
77 feeder.add(procRow)
78 except rscdef.IgnoreThisRow:
79 pass
80
81 if make.rowSource=="parameters":
82 parAdders.setdefault(make.role, []).append(addRow)
83 else:
84 adders.setdefault(make.role, []).append(addRow)
85
86 if make.parmaker:
87 parAdders.setdefault(make.role, []).append(
88 lambda row, m=make, t=table: m.runParmakerFor(row, t))
89
90 feeders.append(feeder)
91 return adders, parAdders, feeders
92
94 adders = reduce(operator.add, addersDict.values(), [])
95 parAdders = reduce(operator.add, parAddersDict.values(), [])
96 def add(row):
97 for adder in adders:
98 adder(row)
99 def addParameters(row):
100 for adder in parAdders:
101 adder(row)
102 return add, addParameters
103
105 def add(roleRow):
106 role, row = roleRow
107 if role not in addersDict:
108 raise base.ReportableError("Grammar tries to feed to role '%s',"
109 " but there is no corresponding make"%role)
110 for adder in addersDict[role]:
111 adder(row)
112
113
114 def addParameters(roleRow):
115 try:
116 role, row = roleRow
117 except ValueError:
118
119 for adder in itertools.chain(*parAddersDict.values()):
120 adder(roleRow)
121 else:
122 for adder in parAddersDict[role]:
123 adder(row)
124
125 return add, addParameters
126
128 for feeder in self.feeders:
129 feeder.flush()
130
132 for feeder in self.feeders:
133 feeder.reset()
134
136 for feeder in self.feeders:
137 feeder.__enter__()
138 return self
139
141 del self.feeders
142 del self.add
143 del self.addParameters
144
146 """calls all subordinate exit methods when there was an error in
147 the controlled block.
148
149 This ignores any additional exceptions that might come out of
150 the exit methods.
151
152 The connection is rolled back, and we unconditionally propagate
153 the exception.
154 """
155 for feeder in self.feeders:
156 try:
157 feeder.__exit__(*excInfo)
158 except:
159 base.ui.notifyError("Ignored exception while exiting data feeder"
160 " on error.")
161 if self.connection and self.runCommit:
162 self.connection.rollback()
163 self._breakCycles()
164
166 """calls all subordinate exit methods when the controlled block
167 exited successfully.
168
169 If one of the exit methods fails, we run _exitFailing and re-raise
170 the exception.
171
172 If all went well and we control a connection, we commit it (unless
173 clients explicitely forbid it).
174 """
175 affected = []
176 for feeder in self.feeders:
177 try:
178 feeder.__exit__(None, None, None)
179 except:
180 self._exitFailing(*sys.exc_info())
181 raise
182 affected.append(feeder.getAffected())
183
184 if self.connection and self.runCommit:
185 self.connection.commit()
186
187 self._breakCycles()
188 if affected:
189 self.nAffected = max(affected)
190
192 if excInfo and excInfo[0]:
193 return self._exitFailing(*excInfo)
194 else:
195 self._exitSuccess()
196
198 return self.nAffected
199
200
201 -class Data(base.MetaMixin, common.ParamMixin):
202 """A collection of tables.
203
204 ``Data``, in essence, is the instanciation of a ``DataDescriptor``.
205
206 It is what ``makeData`` returns. In typical one-table situations,
207 you just want to call the ``getPrimaryTable()`` method to obtain the
208 table built.
209 """
216
218 for make in self.dd.makes:
219 yield self.tables[make.table.id]
220
221 @classmethod
224 """returns a new data instance for dd.
225
226 Existing tables on the database are not touched. To actually
227 re-create them, call recrateTables.
228 """
229 controlledTables = {}
230 res = cls(dd, controlledTables, parseOptions)
231 for make in dd.makes:
232 controlledTables[make.table.id
233 ] = make.create(connection, parseOptions, tables.TableForDef,
234 parent=res)
235 return res
236
237 @classmethod
253
255 """raises a ValidationError if any required parameters within
256 this data's tables are still None.
257 """
258 for t in self:
259 t.validateParams()
260
262 for t in self:
263 if t.tableDef.onDisk:
264 if not parseOptions.systemImport and t.tableDef.system:
265 continue
266 t.drop()
267
278
280 """drops and recreates all table that are onDisk.
281
282 System tables are only recreated when the systemImport parseOption
283 is true.
284 """
285 if self.dd.updating:
286 if self.parseOptions.dropIndices:
287 for t in self:
288 if t.tableDef.onDisk:
289 t.dropIndices()
290 return
291
292 for t in self:
293 if t.tableDef.system and not self.parseOptions.systemImport:
294 continue
295 if t.tableDef.onDisk:
296 t.runScripts("preImport")
297 t.recreate()
298
300 """returns the table contained if there is only one, or the one
301 with the role primary.
302
303 If no matching table can be found, raise a DataError.
304 """
305 try:
306 return self.tables[self.dd.getPrimary().id]
307 except (KeyError, base.StructureError):
308 raise base.DataError(
309 "No primary table in this data")
310
317
319 return _DataFeeder(self, **kwargs)
320
324
327 """is an internal exception that allows processSource to tell makeData
328 to stop handling more sources.
329 """
330
333 pars = srcIter.getParameters()
334 if opts.dumpIngestees:
335 print("PROCESSED PARAMS:", pars)
336 feeder.addParameters(pars)
337
338 for srcRow in srcIter:
339
340 if srcRow is common.FLUSH:
341 feeder.flush()
342 continue
343
344 if srcIter.notify:
345 base.ui.notifyIncomingRow(srcRow)
346 if opts.dumpRows:
347 print(srcRow)
348
349 feeder.add(srcRow)
350 if opts.maxRows:
351 if base.ui.totalRead>=opts.maxRows:
352 raise _EnoughRows
353
356 """helps processSource.
357 """
358 if data.dd.grammar is None:
359 raise base.ReportableError("The data descriptor %s cannot be used"
360 " to make data since it has no defined grammar."%data.dd.id)
361 data.runScripts("newSource", sourceToken=source)
362 srcIter = data.dd.grammar.parse(source, data)
363 if hasattr(srcIter, "getParameters"):
364 try:
365 _pipeRows(srcIter, feeder, opts)
366 except (base.Error,base.ExecutiveAction):
367 raise
368 except Exception as msg:
369 raise base.ui.logOldExc(
370 base.SourceParseError(repr(msg),
371 source=utils.makeLeftEllipsis(repr(source), 80),
372 location=srcIter.getLocator()))
373 else:
374 srcIter(data)
375 data.runScripts("sourceDone", sourceToken=source, feeder=feeder)
376
377
378 -def processSource(data, source, feeder, opts, connection=None):
379 """ingests source into the Data instance data.
380
381 If this builds database tables, you must pass in a connection object.
382
383 If opts.keepGoing is True,the system will continue importing
384 even if a particular source has caused an error. In that case,
385 everything contributed by the bad source is rolled back (this will
386 only work when filling database tables).
387 """
388 if not opts.keepGoing:
389
390 _processSourceReal(data, source, feeder, opts)
391
392 else:
393 if connection is None:
394 raise base.ReportableError("Can only ignore source errors"
395 " when filling database tables.",
396 hint="The -c flag on dachs imp and its friends builds on database"
397 " savepoints. You can thus only meaninfully use it when your"
398 " table has onDisk='True'.")
399 try:
400 with connection.savepoint():
401 _processSourceReal(data, source, feeder, opts)
402 feeder.flush()
403 except Exception as ex:
404 feeder.reset()
405 if not isinstance(ex, base.ExecutiveAction):
406 base.ui.notifyError("Error while importing source %s; changes from"
407 " this source will be rolled back, processing will continue."
408 " (%s)"%(
409 utils.makeSourceEllipsis(source),
410 utils.safe_str(ex)))
411
414 """a scaffolding class instances of which return something (eventually
415 table-like) for all keys it is asked for.
416 """
419
423 """returns a data instance built from ``dd``.
424
425 It will arrange for the parsing of all tables generated from dd's grammar.
426
427 If database tables are being made, you *must* pass in a connection.
428 The entire operation will then run within a single transaction within
429 this connection (except for building dependents; they will be built
430 in separate transactions).
431
432 The connection will be rolled back or committed depending on the
433 success of the operation (unless you pass ``runCommit=False``, in
434 which case even a successful import will not be committed)..
435
436 You can pass in a data instance created by yourself in data. This
437 makes sense if you want to, e.g., add some meta information up front.
438 """
439
440
441 if getattr(base, "VALIDATING", False):
442 return Data(dd, _TableCornucopeia())
443
444 if data is None:
445 res = Data.create(dd, parseOptions, connection=connection)
446 else:
447 res = data
448 res.recreateTables(connection)
449
450 feederOpts = {"batchSize": parseOptions.batchSize, "runCommit": runCommit,
451 "dumpIngestees": parseOptions.dumpIngestees}
452 if dd.grammar and dd.grammar.isDispatching:
453 feederOpts["dispatched"] = True
454
455 with res.getFeeder(connection=connection, **feederOpts) as feeder:
456 if forceSource is None:
457 sources = dd.iterSources(connection)
458 else:
459 sources = [forceSource]
460
461 for source in sources:
462 try:
463 processSource(res, source, feeder, parseOptions, connection)
464 except _EnoughRows:
465 base.ui.notifyWarning("Source hit import limit, import aborted.")
466 break
467 except base.SkipThis:
468 continue
469
470 res.validateParams()
471 res.nAffected = feeder.getAffected()
472
473 if parseOptions.buildDependencies:
474 makeDependentsFor([dd], parseOptions, connection)
475
476 return res
477
481 """a graph giving the dependency structure between DDs.
482
483 This is constructed with a list of DDs.
484
485 From it, you can get a build sequence (least-depending thing build first) or
486 a destroy sequence (most-depending things built first).
487
488 If you pass spanRDs=True, only DDs residing within the first DD's RD are
489 considered.
490 """
492 self.limitToRD = None
493 if not spanRDs and dds:
494 self.limitToRD = dds[0].rd
495
496 self._edges, self._seen = set(), set()
497 self._gather(dds)
498
500 for dependentId in dd.dependents:
501 try:
502 dependentDD = base.resolveId(dd.rd, dependentId)
503 if self.limitToRD and self.limitToRD!=dependentDD.rd:
504 continue
505
506 self._edges.add((dd, dependentDD))
507 if dependentDD not in self._seen:
508 self._seen.add(dependentDD)
509 self._gatherOne(dependentDD)
510 except (base.StructureError, base.NotFoundError) as msg:
511 base.ui.notifyWarning("Ignoring dependent %s of %s (%s)"%(
512 dependentId, dd.getFullId(), unicode(msg)))
513
515 for dd in dds:
516 self._gatherOne(dd)
517
520
522 inverted = [(b,a) for a, b in self._edges]
523 return utils.topoSort(inverted)
524
527 """rebuilds all data dependent on one of the DDs in the dds sequence.
528 """
529 if parseOptions.buildDependencies:
530 parseOptions = parseOptions.change(buildDependencies=False)
531
532 try:
533 buildSequence = DDDependencyGraph(dds).getBuildSequence()
534 except ValueError as ex:
535 raise utils.logOldExc(base.ReportableError("Could not sort"
536 " dependent DDs topologically (use --hints to learn more).",
537 hint="This is most likely because there's a cyclic dependency."
538 " Please check your dependency structure. The original message"
539 " is: %s"%utils.safe_str(ex)))
540
541
542
543
544 for dd in buildSequence[:]:
545 if dd in dds:
546 buildSequence.pop(0)
547 else:
548 break
549
550 if parseOptions.metaOnly:
551 if buildSequence:
552 base.ui.notifyWarning("Only importing metadata, not rebuilding"
553 " dependencies. Depending on your changes, it may be"
554 " necessary to manually re-make one of these: %s"%
555 ", ".join(dd.getFullId() for dd in buildSequence))
556 else:
557 for dd in buildSequence:
558 base.ui.notifyInfo("Making dependent %s"%dd.getFullId())
559 makeData(dd, parseOptions=parseOptions, connection=connection)
560
564 """returns the data set built from the DD with ddId (which must be
565 fully qualified).
566 """
567 dd = base.resolveId(inRD, ddId)
568 return makeData(dd, parseOptions=parseOptions, connection=connection)
569
572 """returns a Data instance containing only table (or table if it's already
573 a data instance).
574
575 If table has no rd, you must pass rdSource, which must be an object having
576 and rd attribute (rds, tabledefs, etc, work).
577
578 This will grab info meta from the table.
579 """
580 if hasattr(table, "dd"):
581
582
583 return table
584
585 if rdSource is None:
586 rd = table.tableDef.rd
587 elif hasattr(rdSource, "rd"):
588 rd = rdSource.rd
589 else:
590 raise TypeError("Invalid RD source: %s"%rdSource)
591 newDD = MS(rscdef.DataDescriptor, makes=[
592 MS(rscdef.Make, table=table.tableDef, rowmaker=None)], parent_=rd)
593 if rdSource:
594 newDD.adopt(table.tableDef)
595
596 newDD.setMetaParent(table.tableDef.rd)
597
598 res = Data(newDD, tables={table.tableDef.id: table})
599
600 for infoMeta in table.iterMeta("info"):
601 res.addMeta("info", infoMeta)
602 for mi in table.iterMeta("_votableRootAttributes", propagate=False):
603 res.addMeta("_votableRootAttributes", mi)
604
605 return res
606
609 """returns a Data instance containing all of tablesForRoles.
610
611 A DD is being generated based on baseDD; if baseDD has any tables, they are
612 discarded.
613
614 tablesForRoles is a mapping from strings (one of which should be "primary")
615 to tables; the strings end up as roles.
616 """
617 newDD = baseDD.change(
618 makes=[MS(rscdef.Make, table=t.tableDef, rowmaker=None, role=role)
619 for role, t in tablesForRoles.iteritems()])
620 newDD.meta_ = baseDD._metaAttr.getCopy(baseDD, newDD, None)
621 return Data(newDD, tables=dict((t.tableDef.id, t)
622 for t in tablesForRoles.values()))
623