Source code for gavo.rsc.table

"""
Tables, base and in memory.

Basically, a table consists of a list of dictionaries (the rows) and a
table definition (resdef.TableDef).

You should, in general, not construct the tables directly but use
the tables.TableForDef factory.  The reason is that some classes ignore
certain aspects of TableDefs (indices, uniqueForceness) or may not be
what TableDef requires at all (onDisk).  Arguably there should be
different TableDefs for all these aspects, but then I'd have a plethora
of TableDef elements, which I think is worse than a factory function.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import sys
import weakref

from gavo import base
from gavo import rscdef
from gavo.rsc import common


[docs]class Error(base.Error): pass
[docs]class ColumnStat(object): """Column statistics as exposed by Limits. These have min, max, and values attributes, all of which can be None. Otherwise, min and max are values of the column type, values is a set of those. """ def __init__(self): self.min, self.max = None, None self.values = None
[docs]class Limits(dict): """Column statistics (min/max, values) for an in-memory table. These are constructed with the rows attribute and a list each for columns for which you want min/max and the values present. Note that None in min/max indicates no non-None values were found. An empty set in values indicates that all values were None. This then exposes a dictionary interface """ def __init__(self, rows, minmaxColumns, enumColumns): dict.__init__(self) self._addMinmax(rows, minmaxColumns) self._addEnums(rows, enumColumns) def _addMinmax(self, rows, minmaxColumns): stats = [(name, ColumnStat()) for name in minmaxColumns] self.update(dict(stats)) for row in rows: for name, stat in stats: val = row[name] if val is None: continue if stat.min is None or stat.min>val: stat.min = val if stat.max is None or stat.max<val: stat.max = val def _addEnums(self, rows, enumColumns): stats = [(name, self.get("name", ColumnStat())) for name in enumColumns] self.update(dict(stats)) for _, stat in stats: stat.values = set() for row in rows: for name, stat in stats: if row[name] is not None: stat.values.add(row[name])
class _Feeder(object): """A device for getting data into a table. A feeder is a context manager that rejects all action from without the controlled section. Within the controlled section, you can use: - add(row) -> None -- add row to table. This may raise all kinds of crazy exceptions. - flush() -> None -- flush out all data that may be cached to the table (this is done automatically on a successful exit) - reset() -> None -- discard any data that may still wait to be flushed to the table At the end of the controlled block, the importFinished or importFailed methods or the parent table are called depending on whether all is well or an exception happened. If importFinished raises and exception, it is handed on to importFailed and re-raised if importFailed returns False. The batch size constructor argument is for the benefit of DBTables. The flush and reset methods are necessary when you do explicit buffering and connection management; you will need to call flush before committing a transaction and reset before rolling one back. """ def __init__(self, table, batchSize=1024): self.table = table self.nAffected = 0 self.active = False def _assertActive(self): if not self.active: raise base.DataError("Trying to feed a dormant feeder.") def getAffected(self): return self.nAffected def add(self, row): self._assertActive() if self.table.validateRows: self.table.tableDef.validateRow(row) self.table.addRow(row) self.nAffected += 1 def flush(self): self._assertActive() # no-op for ram feeder def reset(self): self._assertActive() # no-op for ram feeder def __enter__(self): self.active = True return self def __exit__(self, excType=None, excVal=None, excTb=None): try: if excType is None: # all ok try: self.table.importFinished(self.nAffected) except: if not self.table.importFailed(*sys.exc_info()): raise else: # exception occurred in controlled block self.table.importFailed(excType, excVal, excTb) finally: self.active = False return False def _makeFailIncomplete(name): def fail(self, *args, **kwargs): raise NotImplementedError("%s is an incomplete Table implementation." " No method '%s' defined."%(self.__class__.__name__, name)) return fail
[docs]class BaseTable(base.MetaMixin, common.ParamMixin): """is a container for row data. Tables consist of rows, where each row maps column names to their value for that row. The rows are accessible at least by iterating over a table. Tables get constructed with a tableDef and keyword arguments. For convenience, tables must accept any keyword argument and only pluck those out it wants. Here's a list of keywords used by BaseTables or known subclasses: * validateRows -- have rows be validated by the tableDef before addition (all Tables) * rows -- a list of rows the table has at start (InMemoryTables; DbTables will raise an error on these). * connection -- a database connection to use for accessing DbTables. * votCasts -- a dictionary mapping column names to dictionaries overriding keys of valuemappers.AnnontatedColumn. * params -- a dictionary mapping param keys to values, where python values and literals allowed. You can add rows using the addRow method. For bulk additions, however, it may be much more efficient to call getFeeder (though for in-memory tables, there is no advantage). Initial Metadata is populated from the tableDef. Tables have to implement the following methods: - ``__iter__`` - ``__len__`` - ``__getitem__(n)`` -- returns the n-th row or raises an IndexError - ``removeRow(row) removes a row from the table or raises an IndexError if the row does not exist. This is a slow, O(n) operation. - ``addRow(row)`` -- appends new data to the table - ``getRow(*args)`` -- returns a row by the primary key. If no primary key is defined, a ValueError is raised, if the key is not present, a KeyError. An atomic primary key is accessed through its value, for compound primary keys a tuple must be passed. - ``getFeeder(**kwargs)`` -> feeder object -- returns an object with add and exit methods. See feeder above. - ``importFinished(nAffected)`` -> None -- called when a feeder exits successfully - ``importFailed(*excInfo)`` -> boolean -- called when feeding has failed; when returning True, the exception that has caused the failure is not propagated. - ``close()`` -> may be called by clients to signify the table will no longer be used and resources should be cleared (e.g., for DBTables with private connections). Tables also have several attributes: - ``tableDef`` -- the TableDef that describes the table. - ``newlyCreated`` -- true unless a pre-existing database table was kept (e.g., in updating DDs). """ def __init__(self, tableDef, **kwargs): base.MetaMixin.__init__(self) self.tableDef = tableDef self.newlyCreated = True self.setMetaParent(self.tableDef.getMetaParent()) self.meta_ = self.tableDef.meta_.copy() self.validateRows = kwargs.pop("validateRows", False) self.makeRunning = kwargs.pop("make", None) self.votCasts = kwargs.pop("votCasts", {}) parent = kwargs.pop("parent", None) self.parent = parent and weakref.proxy(parent) self._initParams(self.tableDef, kwargs.pop("params", None)) __iter__ = _makeFailIncomplete("__iter__") __len__ = _makeFailIncomplete("__len__") removeRow = _makeFailIncomplete("removeRow") addRow = _makeFailIncomplete("addRow") getRow = _makeFailIncomplete("getRow") getFeeder = _makeFailIncomplete("getFeeder")
[docs] def getPrimaryTable(self): """returns the table itself. This is just there so that code can blindly say .getPrimaryTable() not worrying whether what it has is a data item or a table. """ return self
[docs] def addTuple(self, tupRow): self.addRow(self.tableDef.makeRowFromTuple(tupRow))
[docs] def importFinished(self, nAffected): pass
[docs] def importFailed(self, *excInfo): return False
[docs] def close(self): pass
[docs] def validateParams(self): """raises a ValidationError if any required parameters of this tables are None. """ for par in self.iterParams(): if par.required and par.value is None: raise base.ValidationError( "Value is required but was not provided", par.name)
[docs] def expand(self, s): """macro-expands the string s within self's tableDef. """ if "\\" in s: s = self.tableDef.expand(s) return s
[docs]class InMemoryTable(BaseTable): """is a table kept in memory. This table only keeps an index for the primary key. All other indices are ignored. """ def __init__(self, tableDef, **kwargs): BaseTable.__init__(self, tableDef, **kwargs) self.rows = kwargs.get("rows", []) def __iter__(self): return iter(self.rows) def __len__(self): return len(self.rows) def __bool__(self): return bool(self.rows)
[docs] def removeRow(self, row): self.rows.remove(row)
[docs] def addRow(self, row): if self.validateRows: try: self.tableDef.validateRow(row) except rscdef.IgnoreThisRow: return self.rows.append(row)
[docs] def getRow(self, *args): raise ValueError("Cannot use getRow in index-less table")
[docs] def getFeeder(self, **kwargs): return _Feeder(self, **kwargs)
[docs] def getLimits(self): """returns a limits instance for this table. This is a characterisation of the ranges of things in this table, pretty much as what dachs info does; if you fix things here, you probably want to fix things there, too. """ minmaxColumns, enumColumns = [], [] for col in self.tableDef: if col.isEnumerated(): enumColumns.append(col.name) elif col.type in base.ORDERED_TYPES or col.type.startswith("char"): minmaxColumns.append(col.name) return Limits(self.rows, minmaxColumns, enumColumns)
[docs]class InMemoryIndexedTable(InMemoryTable): """is an InMemoryTable for a TableDef with a primary key. """ def __init__(self, tableDef, **kwargs): InMemoryTable.__init__(self, tableDef, **kwargs) if not self.tableDef.primary: raise Error("No primary key given for InMemoryIndexedTable") self._makeRowIndex()
[docs] def removeRow(self, row): # This remains slow since we do not keep the index of a row in self.rows InMemoryTable.removeRow(self, row) del self.rowIndex[self.tableDef.getPrimaryIn(row)]
[docs] def addRow(self, row): if self.validateRows: try: self.tableDef.validateRow(row) except rscdef.IgnoreThisRow: return self.rows.append(row) self.rowIndex[self.tableDef.getPrimaryIn(row)] = row
[docs] def getRow(self, *args): return self.rowIndex[args]
def _makeRowIndex(self): """recreates the index of primary keys to rows. """ self.rowIndex = {} for r in self.rows: self.rowIndex[self.tableDef.getPrimaryIn(r)] = r
[docs]class UniqueForcedTable(InMemoryIndexedTable): """is an InMemoryTable with an enforced policy on duplicate primary keys. See resdef.TableDef for a discussion of the policies. """ def __init__(self, tableDef, **kwargs): # hide init rows (if present) in the next line to not let # duplicate primaries slip in here. rows = kwargs.pop("rows", []) InMemoryIndexedTable.__init__(self, tableDef, **kwargs) try: self.resolveConflict = { "check": self._ensureRowIdentity, "drop": self._dropNew, "overwrite": self._overwriteOld, "dropOld": self._overwriteOld, }[self.tableDef.dupePolicy] except KeyError as msg: raise base.ui.logOldExc( Error("Invalid conflict resolution strategy: %s"%str(msg))) for row in rows: self.addRow(row) def _ensureRowIdentity(self, row, key): """raises an exception if row is not equivalent to the row stored for key. This is one strategy for resolving primary key conflicts. """ storedRow = self.rowIndex[key] if list(row.keys())!=list(storedRow.keys()): raise Error("Differing rows for primary key %s: %s vs. %s"%( key, self.rowIndex[key], row)) for colName in row: if row[colName]!=storedRow[colName]: raise base.ValidationError( "Differing rows for primary key %s;" " %s vs. %s"%(key, row[colName], storedRow[colName]), colName=colName, row=row) def _dropNew(self, row, key): """does nothing. This is for resolution of conflicting rows (the "drop" strategy). """ pass def _overwriteOld(self, row, key): """overwrites the existing rows with key in table with rows. This is for resolution of conflicting rows (the "overwrite" strategy). Warning: This is typically rather slow. """ storedRow = self.rowIndex[key] self.removeRow(storedRow) return self.addRow(row)
[docs] def addRow(self, row): if self.validateRows: try: self.tableDef.validateRow(row) except rscdef.IgnoreThisRow: return key = self.tableDef.getPrimaryIn(row) if key in self.rowIndex: return self.resolveConflict(row, key) else: self.rowIndex[key] = row return InMemoryIndexedTable.addRow(self, row)