Source code for

Making data out of descriptors and sources.

#c Copyright 2008-2022, the GAVO project <>
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import itertools
import operator
import sys

from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rsc import common
from gavo.rsc import dbtable
from gavo.rsc import table
from gavo.rsc import tables
from functools import reduce

MS = base.makeStruct

class _DataFeeder(table._Feeder):
	"""is a feeder for data (i.e., table collections).

	This is basically a collection of all feeders of the tables belonging
	to data, except it will also call the table's mappers, i.e., add
	expects source rows from data's grammars.

	Feeders can be dispatched; this only works if the grammar returns
	pairs of role and row rather than only the row.  Dispatched
	feeders only pass rows to the makes corresponding to the role.

	If you pass in a connection, the data feeder will manage it (i.e.
	commit if all went well, rollback otherwise).
	def __init__(self, data, batchSize=1024, dispatched=False,
			runCommit=True, connection=None, dumpIngestees=False):, self.batchSize = data, batchSize
		self.runCommit = runCommit
		self.nAffected = 0
		self.connection = connection
		self.dumpIngestees = dumpIngestees
		if dispatched:
			makeAdders = self._makeFeedsDispatched
			makeAdders = self._makeFeedsNonDispatched

		addersDict, parAddersDict, self.feeders = self._getAdders()
		self.add, self.addParameters = makeAdders(addersDict, parAddersDict)

	def _getAdders(self):
		"""returns a triple of (rowAdders, parAdders, feeds) for the data we
		feed to.

		rowAdders contains functions to add raw rows returned from a grammar,
		parAdders the same for parameters returned by the grammar, and
		feeds is a list containing all feeds the adders add to (this
		is necessary to let us exit all of them.
		adders, parAdders, feeders = {}, {}, []
		for make in
			table =[]
			feeder = table.getFeeder(batchSize=self.batchSize)
			makeRow = make.rowmaker.compileForTableDef(table.tableDef)

			def addRow(srcRow, feeder=feeder, makeRow=makeRow, table=table):
					procRow = makeRow(srcRow, table)
					if self.dumpIngestees:
						print("PROCESSED ROW:", procRow)
				except rscdef.IgnoreThisRow:

			if make.rowSource=="parameters":
				parAdders.setdefault(make.role, []).append(addRow)
				adders.setdefault(make.role, []).append(addRow)

			if make.parmaker:
				parAdders.setdefault(make.role, []).append(
					lambda row, m=make, t=table: m.runParmakerFor(row, t))

		return adders, parAdders, feeders

	def _makeFeedsNonDispatched(self, addersDict, parAddersDict):
		adders = reduce(operator.add, list(addersDict.values()), [])
		parAdders = reduce(operator.add, list(parAddersDict.values()), [])
		def add(row):
			for adder in adders:
		def addParameters(row):
			for adder in parAdders:
		return add, addParameters

	def _makeFeedsDispatched(self, addersDict, parAddersDict):
		def add(roleRow):
			role, row = roleRow
			if role not in addersDict:
				raise base.ReportableError("Grammar tries to feed to role '%s',"
					" but there is no corresponding make"%role)
			for adder in addersDict[role]:

		# for parameters, allow broadcast
		def addParameters(roleRow):
				role, row = roleRow
			except ValueError:
				# assume we only got a row, broadcast it
				for adder in itertools.chain(*list(parAddersDict.values())):
				for adder in parAddersDict[role]:

		return add, addParameters

	def flush(self):
		for feeder in self.feeders:
	def reset(self):
		for feeder in self.feeders:

	def __enter__(self):
		for feeder in self.feeders:
		return self

	def _breakCycles(self):
		del self.feeders
		del self.add
		del self.addParameters

	def _exitFailing(self, *excInfo):
		"""calls all subordinate exit methods when there was an error in
		the controlled block.

		This ignores any additional exceptions that might come out of
		the exit methods.

		The connection is rolled back, and we unconditionally propagate
		the exception.
		for feeder in self.feeders:
				base.ui.notifyError("Ignored exception while exiting data feeder"
					" on error.")
		if self.connection and self.runCommit:
	def _exitSuccess(self):
		"""calls all subordinate exit methods when the controlled block
		exited successfully.

		If one of the exit methods fails, we run _exitFailing and re-raise
		the exception.

		If all went well and we control a connection, we commit it (unless
		clients explicitly forbid it).
		affected = []
		for feeder in self.feeders:
				feeder.__exit__(None, None, None)

		if self.connection and self.runCommit:

		if affected:
			self.nAffected = max(affected)

	def __exit__(self, *excInfo):
		if excInfo and excInfo[0]:
			return self._exitFailing(*excInfo)
	def getAffected(self):
		return self.nAffected

[docs]class Data(base.MetaMixin, common.ParamMixin): """A collection of tables. ``Data``, in essence, is the instantiation of a ``DataDescriptor``. It is what ``makeData`` returns. In typical one-table situations, you just want to call the ``getPrimaryTable()`` method to obtain the table built. """ def __init__(self, dd, tables, parseOptions=common.parseNonValidating, overrideMakes=None): base.MetaMixin.__init__(self) # we're not a structure self.dd, self.parseOptions = dd, parseOptions self.makes = overrideMakes or self.dd.makes self.tables = tables self.setMetaParent(self.dd) self._initParams(self.dd) def __iter__(self): for make in self.makes: yield self.tables[]
[docs] @classmethod def create(cls, dd, parseOptions=common.parseNonValidating, connection=None): """returns a new data instance for dd. Existing tables on the database are not touched. To actually re-create them, call recrateTables. """ controlledTables = {} res = cls(dd, controlledTables, parseOptions) for make in dd.makes: controlledTables[ ] = make.create(connection, parseOptions, tables.TableForDef, parent=res) return res
[docs] @classmethod def drop(cls, dd, parseOptions=common.parseNonValidating, connection=None): """drops all tables made by dd if necessary. """ controlledTables = {} for make in dd.makes: controlledTables[ ] = tables.TableForDef( make.table, create=False, connection=connection, make=make) data = cls(dd, controlledTables, parseOptions) data.dropTables(parseOptions) return data
[docs] @classmethod def createWithTable(cls, dd, tableDef, parseOptions=common.parseNonValidating): """builds a table for tableDef with this data item. This is for when there are many rather similar table structures that can all be built with the same data item. This can only work if dd only has one make. """ if len(dd.makes)!=1: raise base.ReportableError("Data.create_table is only" " allowed on one-make DDs") if tableDef.onDisk: raise base.ReportableError("Data.create_table so far" " doesn't allow onDisk tables (this is because nobody" " has thought about it; it would probably be harmless).") controlledTables = { tables.TableForDef(tableDef)} return cls(dd, controlledTables, parseOptions, overrideMakes=[dd.makes[0].change(table=tableDef)])
[docs] def dbCatalogChanged(self): """returns true if a database table has been newly created by this class. """ return any([ isinstance(t, dbtable.DBTable) and t.newlyCreated for t in list(self.tables.values())])
[docs] def validateParams(self): """raises a ValidationError if any required parameters within this data's tables are still None. """ for t in self: t.validateParams()
[docs] def dropTables(self, parseOptions): for t in self: if t.tableDef.onDisk: if not parseOptions.systemImport and t.tableDef.system: continue t.drop()
[docs] def updateMeta(self): for t in self: if hasattr(t, "updateMeta"): t.updateMeta()
[docs] def recreateTables(self, connection): """drops and recreates all table that are onDisk. System tables are only recreated when the systemImport parseOption is true. """ if self.dd.updating: if self.parseOptions.dropIndices: for t in self: if t.tableDef.onDisk: t.dropIndices() for t in self: if t.newlyCreated: t.runScripts("preImport") return for t in self: if t.tableDef.system and not self.parseOptions.systemImport: continue if t.tableDef.onDisk and not t.newlyCreated: t.recreate() t.runScripts("preImport")
[docs] def getParam(self, paramName, default=base.NotGiven): """returns self's parameter of paramName, or, failing that, paramName from self's primaryTable. """ try: return common.ParamMixin.getParam(self, paramName, default) except base.NotFoundError: try: return self.getPrimaryTable().getParam(paramName, default) except (base.DataError, base.NotFoundError): # param not in primary table, or no primary table: # raise first exception pass raise
[docs] def getPrimaryTable(self): """returns the table contained if there is only one, or the one with the role primary. If no matching table can be found, raise a DataError. """ if len(self.tables)==1: return list(self.tables.values())[0] try: return self.tables[self.dd.getPrimary().id] except (KeyError, base.StructureError): raise base.DataError( "No primary table in this data")
[docs] def getTableWithRole(self, role): try: return self.tables[self.dd.getTableDefWithRole(role).id] except (KeyError, base.StructureError): raise base.DataError( "No table with role %s known here"%repr(role))
[docs] def getFeeder(self, **kwargs): return _DataFeeder(self, **kwargs)
[docs] def runScripts(self, phase, **kwargs): for make in self.makes: make.getRunner()(self.tables[], phase, **kwargs)
class _EnoughRows(base.ExecutiveAction): """is an internal exception that allows processSource to tell makeData to stop handling more sources. """ def _pipeRows(srcIter, feeder, opts): pars = srcIter.getParameters() if opts.dumpIngestees: print("PROCESSED PARAMS:", pars) feeder.addParameters(pars) for srcRow in srcIter: if srcRow is common.FLUSH: feeder.flush() continue if srcIter.notify: base.ui.notifyIncomingRow(srcRow) if opts.dumpRows: print(srcRow) feeder.add(srcRow) if opts.maxRows: if base.ui.totalRead>=opts.maxRows: raise _EnoughRows def _processSourceReal(data, source, feeder, opts): """helps processSource. """ if data.dd.grammar is None: raise base.ReportableError("The data descriptor %s cannot be used" " to make data since it has no defined grammar." data.runScripts("newSource", sourceToken=source) srcIter = data.dd.grammar.parse(source, data) if hasattr(srcIter, "getParameters"): # is a "normal" grammar try: _pipeRows(srcIter, feeder, opts) except (base.Error,base.ExecutiveAction): raise except Exception as msg: raise base.ui.logOldExc( base.SourceParseError(repr(msg), source=utils.makeLeftEllipsis(repr(source), 80), location=srcIter.getLocator())) else: # magic grammars (like those of boosters) return a callable srcIter(data) data.runScripts("sourceDone", sourceToken=source, feeder=feeder)
[docs]def processSource(data, source, feeder, opts, connection=None): """ingests source into the Data instance data. If this builds database tables, you must pass in a connection object. If opts.keepGoing is True,the system will continue importing even if a particular source has caused an error. In that case, everything contributed by the bad source is rolled back (this will only work when filling database tables). """ if not opts.keepGoing: # simple shortcut if we don't want to recover from bad sources _processSourceReal(data, source, feeder, opts) else: # recover from bad sources, be more careful if connection is None: raise base.ReportableError("Can only ignore source errors" " when filling database tables.", hint="The -c flag on dachs imp and its friends builds on database" " savepoints. You can thus only meaningfully use it when your" " table has onDisk='True'.") try: with connection.savepoint(): _processSourceReal(data, source, feeder, opts) feeder.flush() except base.ExecutiveAction: raise except Exception as ex: feeder.reset() base.ui.notifyError("Error while importing source %s; changes from" " this source will be rolled back, processing will continue." " (%s)"%( utils.makeSourceEllipsis(source), utils.safe_str(ex)))
class _TableCornucopeia(object): """a scaffolding class instances of which return something (eventually table-like) for all keys it is asked for. """ def __getitem__(self, key): return None def __len__(self): # is is mainly a signal to getPrimaryTable to try a bit harder # when we stand in for data.tables. return 0
[docs]class MultiForcedSources: """This lets you pass in arbitrary sequences as forceSource in makeData. Without this, the list will be interpreted as a single source. """ def __init__(self, seq): self.seq = seq
[docs] def iterSources(self, connection): return iter(self.seq)
[docs]def makeData(dd, parseOptions=common.parseNonValidating, forceSource=None, connection=None, data=None, runCommit=True): """returns a data instance built from ``dd``. It will arrange for the parsing of all tables generated from dd's grammar. If database tables are being made, you *must* pass in a connection. The entire operation will then run within a single transaction within this connection (except for building dependents; they will be built in separate transactions). The connection will be rolled back or committed depending on the success of the operation (unless you pass ``runCommit=False``, in which case even a successful import will not be committed).. You can pass in a data instance created by yourself in data. This makes sense if you want to, e.g., add some meta information up front. makeData will usually iterate over the sources given in dd. You can override this with forceSource, which can contain a single source passed to a grammar. If you need to pass in multiple sources, use a MultiForcedSources object (or anything that has an iterSources(dbConnection) method). """ # Some proc setup does expensive things like actually building data. # We don't want that when validating and return some empty data thing. if getattr(base, "VALIDATING", False): return Data(dd, _TableCornucopeia()) if data is None: res = Data.create(dd, parseOptions, connection=connection) else: res = data res.recreateTables(connection) feederOpts = {"batchSize": parseOptions.batchSize, "runCommit": runCommit, "dumpIngestees": parseOptions.dumpIngestees} if dd.grammar and dd.grammar.isDispatching: feederOpts["dispatched"] = True with res.getFeeder(connection=connection, **feederOpts) as feeder: if forceSource is None: sources = dd.iterSources(connection) else: if hasattr(forceSource, "iterSources"): sources = forceSource.iterSources(connection) else: sources = [forceSource] for source in sources: try: processSource(res, source, feeder, parseOptions, connection) except _EnoughRows: base.ui.notifyWarning("Source hit import limit, import aborted.") break except base.SkipThis: continue res.validateParams() res.nAffected = feeder.getAffected() if parseOptions.buildDependencies: makeDependentsFor( [dd], parseOptions, connection, res.dbCatalogChanged()) return res
[docs]class DDDependencyGraph(object): """a graph giving the dependency structure between DDs. This is constructed with a list of DDs. From it, you can get a build sequence (least-depending thing build first) or a destroy sequence (most-depending things built first). If you pass spanRDs=True, only DDs residing within the first DD's RD are considered. """ def __init__(self, dds, spanRDs=True): self.limitToRD = None if not spanRDs and dds: self.limitToRD = dds[0].rd self._edges, self._seen = set(), set() self._gather(dds) def _gatherOne(self, dd): for dependentId in dd.dependents: try: dependentDD = base.resolveId(dd.rd, dependentId) if self.limitToRD and self.limitToRD!=dependentDD.rd: continue self._edges.add((dd, dependentDD)) if dependentDD not in self._seen: self._seen.add(dependentDD) self._gatherOne(dependentDD) except (base.StructureError, base.NotFoundError) as msg: base.ui.notifyWarning("Ignoring dependent %s of %s (%s)"%( dependentId, dd.getFullId(), str(msg))) def _gather(self, dds): for dd in dds: self._gatherOne(dd)
[docs] def getBuildSequence(self): return utils.topoSort(self._edges)
[docs] def getDestroySequence(self): inverted = [(b,a) for a, b in self._edges] return utils.topoSort(inverted)
[docs]def makeDependentsFor(dds, parseOptions, connection, sysCatChanged): """rebuilds all data dependent on one of the DDs in the dds sequence. """ if parseOptions.buildDependencies: parseOptions = parseOptions.change(buildDependencies=False) try: buildSequence = DDDependencyGraph(dds).getBuildSequence() except ValueError as ex: raise utils.logOldExc(base.ReportableError("Could not sort" " dependent DDs topologically (use --hints to learn more).", hint="This is most likely because there's a cyclic dependency." " Please check your dependency structure. The original message" " is: %s"%utils.safe_str(ex))) # remove DDs passed in from the build sequence, as long as nothing # is built in between (which might necessitate a re-build of something # we already did) for dd in buildSequence[:]: if dd in dds: buildSequence.pop(0) else: break if parseOptions.metaOnly: # TODO: Is metaOnly just a special case of sysCatChange=False? if buildSequence: base.ui.notifyWarning("Only importing metadata, not rebuilding" " dependencies. Depending on your changes, it may be" " necessary to manually re-make one of these: %s"% ", ".join(dd.getFullId() for dd in buildSequence)) else: for dd in buildSequence: if sysCatChanged or dd.remakeOnDataChange: base.ui.notifyInfo("Making dependent %s"%dd.getFullId()) makeData(dd, parseOptions=parseOptions, connection=connection)
[docs]def makeDataById(ddId, parseOptions=common.parseNonValidating, connection=None, inRD=None): """returns the data set built from the DD with ddId (which must be fully qualified). """ dd = base.resolveId(inRD, ddId) return makeData(dd, parseOptions=parseOptions, connection=connection)
[docs]def wrapTable(table, rdSource=None): """returns a Data instance containing only table (or table if it's already a data instance). If table has no rd, you must pass rdSource, which must be an object having and rd attribute (rds, tabledefs, etc, work). This will grab info meta from the table. """ if hasattr(table, "dd"): # we trust it's already a Data instance (don't want to use isinstance # here since people may pass in fakes). return table if rdSource is None: rd = table.tableDef.rd elif hasattr(rdSource, "rd"): rd = rdSource.rd else: raise TypeError("Invalid RD source: %s"%rdSource) newDD = MS(rscdef.DataDescriptor, makes=[ MS(rscdef.Make, table=table.tableDef, rowmaker=None)], parent_=rd) if rdSource: newDD.adopt(table.tableDef) newDD.setMetaParent(table.tableDef.rd) res = Data(newDD, tables={ table}) for infoMeta in table.iterMeta("info"): res.addMeta("info", infoMeta) for mi in table.iterMeta("_votableRootAttributes", propagate=False): res.addMeta("_votableRootAttributes", mi) return res
[docs]def makeCombinedData(baseDD, tablesForRoles): """returns a Data instance containing all of tablesForRoles. A DD is being generated based on baseDD; if baseDD has any tables, they are discarded. tablesForRoles is a mapping from strings (one of which should be "primary") to tables; the strings end up as roles. """ newDD = baseDD.change( makes=[MS(rscdef.Make, table=t.tableDef, rowmaker=None, role=role) for role, t in tablesForRoles.items()]) newDD.meta_ = baseDD._metaAttr.getCopy(baseDD, newDD, None) return Data(newDD, tables=dict((, t) for t in list(tablesForRoles.values())))