Package gavo :: Package rsc :: Module table
[frames] | no frames]

Source Code for Module gavo.rsc.table

  1  """ 
  2  Tables, base and in memory. 
  3   
  4  Basically, a table consists of a list of dictionaries (the rows) and a 
  5  table definition (resdef.TableDef). 
  6   
  7  You should, in general, not construct the tables directly but use 
  8  the tables.TableForDef factory.  The reason is that some classes ignore 
  9  certain aspects of TableDefs (indices, uniqueForceness) or may not be 
 10  what TableDef requires at all (onDisk).  Arguably there should be 
 11  different TableDefs for all these aspects, but then I'd have a plethora 
 12  of TableDef elements, which I think is worse than a factory function. 
 13  """ 
 14   
 15  #c Copyright 2008-2019, the GAVO project 
 16  #c 
 17  #c This program is free software, covered by the GNU GPL.  See the 
 18  #c COPYING file in the source distribution. 
 19   
 20   
 21  import sys 
 22  import weakref 
 23   
 24  from gavo import base 
 25  from gavo import rscdef 
 26  from gavo.rsc import common 
 27   
 28   
29 -class Error(base.Error):
30 pass
31 32
33 -class ColumnStat(object):
34 """Column statistics as exposed by Limits. 35 36 These have min, max, and values attributes, all of which can be None. 37 otherwise, min and max are values of the column type, values is a set 38 of those. 39 """
40 - def __init__(self):
41 self.min, self.max = None, None 42 self.values = None
43 44
45 -class Limits(dict):
46 """Column statistics (min/max, values) for an in-memory table. 47 48 These are constructed with the rows attribute and a list each for 49 columns for which you want min/max and the values present. Note 50 that None in min/max indicates no non-None values were found. An 51 empty set in values indicates that all values were None. 52 53 This then exposes a dictionary interface 54 """
55 - def __init__(self, rows, minmaxColumns, enumColumns):
56 dict.__init__(self) 57 self._addMinmax(rows, minmaxColumns) 58 self._addEnums(rows, enumColumns)
59
60 - def _addMinmax(self, rows, minmaxColumns):
61 stats = [(name, ColumnStat()) for name in minmaxColumns] 62 self.update(dict(stats)) 63 64 for row in rows: 65 for name, stat in stats: 66 val = row[name] 67 if val is None: 68 continue 69 if stat.min is None or stat.min>val: 70 stat.min = val 71 if stat.max is None or stat.max<val: 72 stat.max = val
73
74 - def _addEnums(self, rows, enumColumns):
75 stats = [(name, self.get("name", ColumnStat())) for name in enumColumns] 76 self.update(dict(stats)) 77 for _, stat in stats: 78 stat.values = set() 79 80 for row in rows: 81 for name, stat in stats: 82 if row[name] is not None: 83 stat.values.add(row[name])
84 85
86 -class _Feeder(object):
87 """A device for getting data into a table. 88 89 A feeder is a context manager that rejects all action from without 90 the controlled section. Within the controlled section, you can use: 91 92 - add(row) -> None -- add row to table. This may raise all kinds 93 of crazy exceptions. 94 - flush() -> None -- flush out all data that may be cached to the table 95 (this is done automatically on a successful exit) 96 - reset() -> None -- discard any data that may still wait to be 97 flushed to the table 98 99 At the end of the controlled block, the importFinished or importFailed 100 methods or the parent table are called depending on whether all is 101 well or an exception happened. If importFinished raises and 102 exception, it is handed on to importFailed and re-raised if importFailed 103 returns False. 104 105 The batch size constructor argument is for the benefit of DBTables. 106 107 The flush and reset methods are necessary when you do explicit buffering and 108 connection management; you will need to call flush before committing a 109 transaction and reset before rolling one back. 110 """
111 - def __init__(self, table, batchSize=1024):
112 self.table = table 113 self.nAffected = 0 114 self.active = False
115
116 - def _assertActive(self):
117 if not self.active: 118 raise base.DataError("Trying to feed a dormant feeder.")
119
120 - def getAffected(self):
121 return self.nAffected
122
123 - def add(self, row):
124 self._assertActive() 125 if self.table.validateRows: 126 self.table.tableDef.validateRow(row) 127 self.table.addRow(row) 128 self.nAffected += 1
129
130 - def flush(self):
131 self._assertActive()
132 # no-op for ram feeder 133
134 - def reset(self):
135 self._assertActive()
136 # no-op for ram feeder 137
138 - def __enter__(self):
139 self.active = True 140 return self
141
142 - def __exit__(self, excType=None, excVal=None, excTb=None):
143 try: 144 if excType is None: # all ok 145 try: 146 self.table.importFinished() 147 except: 148 if not self.table.importFailed(*sys.exc_info()): 149 raise 150 else: # exception occurred in controlled block 151 self.table.importFailed(excType, excVal, excTb) 152 finally: 153 self.active = False 154 return False
155 156
157 -def _makeFailIncomplete(name):
158 def fail(self, *args, **kwargs): 159 raise NotImplementedError("%s is an incomplete Table implementation." 160 " No method '%s' defined."%(self.__class__.__name__, name))
161 return fail 162 163
164 -class BaseTable(base.MetaMixin, common.ParamMixin):
165 """is a container for row data. 166 167 Tables consist of rows, where each row maps column names to their 168 value for that row. The rows are accessible at least by iterating 169 over a table. 170 171 Tables get constructed with a tableDef and keyword arguments. For 172 convenience, tables must accept any keyword argument and only pluck those 173 out it wants. 174 175 Here's a list of keywords used by BaseTables or known subclasses: 176 177 - validateRows -- have rows be validated by the tableDef before addition 178 (all Tables) 179 - rows -- a list of rows the table has at start (InMemoryTables; DbTables 180 will raise an error on these). 181 - connection -- a database connection to use for accessing DbTables. 182 - votCasts -- a dictionary mapping column names to dictionaries overriding 183 keys of valuemappers.AnnontatedColumn. 184 - params -- a dictionary mapping param keys to values, where python 185 values and literals allowed. 186 187 You can add rows using the addRow method. For bulk additions, however, 188 it may be much more efficient to call getFeeder (though for in-memory 189 tables, there is no advantage). 190 191 Tables can run "scripts" if someone furnishes them with a _runScripts 192 method. This currently is only done for DBTables. See Scripting_. 193 194 Initial Metadata is populated from the tableDef. 195 196 Tables have to implement the following methods: 197 198 - __iter__ 199 - __len__ 200 - __getitem__(n) -- returns the n-th row or raises an IndexError 201 - removeRow(row) removes a row from the table or raises an 202 IndexError if the row does not exist. This is a slow, O(n) operation. 203 - addRow(row) -- appends new data to the table 204 - getRow(*args) -- returns a row by the primary key. If no primary key 205 is defined, a ValueError is raised, if the key is not present, a 206 KeyError. An atomic primary key is accessed through its value, 207 for compound primary keys a tuple must be passed. 208 - getFeeder(**kwargs) -> feeder object -- returns an object with add and 209 exit methods. See feeder above. 210 - importFinished() -> None -- called when a feeder exits successfully 211 - importFailed(*excInfo) -> boolean -- called when feeding has failed; 212 when returning True, the exception that has caused the failure 213 is not propagated. 214 - close() -> may be called by clients to signify the table will no 215 longer be used and resources should be cleared (e.g., for DBTables 216 with private connections). 217 """ 218 _runScripts = None 219
220 - def __init__(self, tableDef, **kwargs):
221 base.MetaMixin.__init__(self) 222 self.tableDef = tableDef 223 self.setMetaParent(self.tableDef.getMetaParent()) 224 self.meta_ = self.tableDef.meta_.copy() 225 self.validateRows = kwargs.get("validateRows", False) 226 self.votCasts = kwargs.get("votCasts", {}) 227 parent = kwargs.get("parent") 228 self.parent = parent and weakref.proxy(parent) 229 self._initParams(self.tableDef, kwargs.pop("params", None))
230 231 __iter__ = _makeFailIncomplete("__iter__") 232 __len__ = _makeFailIncomplete("__len__") 233 removeRow = _makeFailIncomplete("removeRow") 234 addRow = _makeFailIncomplete("addRow") 235 getRow = _makeFailIncomplete("getRow") 236 getFeeder = _makeFailIncomplete("getFeeder") 237
238 - def addTuple(self, tupRow):
239 self.addRow(self.tableDef.makeRowFromTuple(tupRow))
240
241 - def importFinished(self):
242 pass
243
244 - def importFailed(self, *excInfo):
245 return False
246
247 - def close(self):
248 pass
249
250 - def runScripts(self, phase, **kwargs):
251 if self._runScripts: # if defined, it was set by data and make. 252 self._runScripts(self, phase, **kwargs)
253
254 - def validateParams(self):
255 """raises a ValidationError if any required parameters of this 256 tables are None. 257 """ 258 for par in self.iterParams(): 259 if par.required and par.value is None: 260 raise base.ValidationError( 261 "Value is required but was not provided", par.name)
262 263
264 -class InMemoryTable(BaseTable):
265 """is a table kept in memory. 266 267 This table only keeps an index for the primaray key. All other indices 268 are ignored. 269 """
270 - def __init__(self, tableDef, **kwargs):
271 BaseTable.__init__(self, tableDef, **kwargs) 272 self.rows = kwargs.get("rows", [])
273
274 - def __iter__(self):
275 return iter(self.rows)
276
277 - def __len__(self):
278 return len(self.rows)
279
280 - def __nonzero__(self):
281 return bool(self.rows)
282
283 - def removeRow(self, row):
284 self.rows.remove(row)
285
286 - def addRow(self, row):
287 if self.validateRows: 288 try: 289 self.tableDef.validateRow(row) 290 except rscdef.IgnoreThisRow: 291 return 292 self.rows.append(row)
293
294 - def getRow(self, *args):
295 raise ValueError("Cannot use getRow in index-less table")
296
297 - def getFeeder(self, **kwargs):
298 return _Feeder(self, **kwargs)
299
300 - def getLimits(self):
301 """returns a limits instance for this table. 302 303 This is a characterisation of the ranges of things in this table, 304 pretty much as what dachs info does; if you fix things here, you probaly 305 want to fix things there, too. 306 """ 307 minmaxColumns, enumColumns = [], [] 308 for col in self.tableDef: 309 if col.isEnumerated(): 310 enumColumns.append(col.name) 311 elif col.type in base.ORDERED_TYPES or col.type.startswith("char"): 312 minmaxColumns.append(col.name) 313 return Limits(self.rows, minmaxColumns, enumColumns)
314 315
316 -class InMemoryIndexedTable(InMemoryTable):
317 """is an InMemoryTable for a TableDef with a primary key. 318 """
319 - def __init__(self, tableDef, **kwargs):
320 InMemoryTable.__init__(self, tableDef, **kwargs) 321 if not self.tableDef.primary: 322 raise Error("No primary key given for InMemoryIndexedTable") 323 self._makeRowIndex()
324
325 - def removeRow(self, row):
326 # This remains slow since we do not keep the index of a row in self.rows 327 InMemoryTable.removeRow(self, row) 328 del self.rowIndex[self.tableDef.getPrimaryIn(row)]
329
330 - def addRow(self, row):
331 if self.validateRows: 332 try: 333 self.tableDef.validateRow(row) 334 except rscdef.IgnoreThisRow: 335 return 336 self.rows.append(row) 337 self.rowIndex[self.tableDef.getPrimaryIn(row)] = row
338
339 - def getRow(self, *args):
340 return self.rowIndex[args]
341
342 - def _makeRowIndex(self):
343 """recreates the index of primary keys to rows. 344 """ 345 self.rowIndex = {} 346 for r in self.rows: 347 self.rowIndex[self.tableDef.getPrimaryIn(r)] = r
348 349
350 -class UniqueForcedTable(InMemoryIndexedTable):
351 """is an InMemoryTable with an enforced policy on duplicate 352 primary keys. 353 354 See resdef.TableDef for a discussion of the policies. 355 """
356 - def __init__(self, tableDef, **kwargs):
357 # hide init rows (if present) in the next line to not let 358 # duplicate primaries slip in here. 359 rows = kwargs.pop("rows", []) 360 InMemoryIndexedTable.__init__(self, tableDef, **kwargs) 361 try: 362 self.resolveConflict = { 363 "check": self._ensureRowIdentity, 364 "drop": self._dropNew, 365 "overwrite": self._overwriteOld, 366 "dropOld": self._overwriteOld, 367 }[self.tableDef.dupePolicy] 368 except KeyError as msg: 369 raise base.ui.logOldExc( 370 Error("Invalid conflict resolution strategy: %s"%str(msg))) 371 for row in rows: 372 self.addRow(row)
373
374 - def _ensureRowIdentity(self, row, key):
375 """raises an exception if row is not equivalent to the row stored 376 for key. 377 378 This is one strategy for resolving primary key conflicts. 379 """ 380 storedRow = self.rowIndex[key] 381 if row.keys()!=storedRow.keys(): 382 raise Error("Differing rows for primary key %s: %s vs. %s"%( 383 key, self.rowIndex[key], row)) 384 for colName in row: 385 if row[colName] is None or storedRow[colName] is None: 386 continue 387 if row[colName]!=storedRow[colName]: 388 raise base.ValidationError( 389 "Differing rows for primary key %s;" 390 " %s vs. %s"%(key, row[colName], 391 storedRow[colName]), colName=colName, row=row)
392
393 - def _dropNew(self, row, key):
394 """does nothing. 395 396 This is for resolution of conflicting rows (the "drop" strategy). 397 """ 398 pass
399
400 - def _overwriteOld(self, row, key):
401 """overwrites the existing rows with key in table with rows. 402 403 This is for resolution of conflicting rows (the "overwrite" 404 strategy). 405 406 Warning: This is typically rather slow. 407 """ 408 storedRow = self.rowIndex[key] 409 self.removeRow(storedRow) 410 return self.addRow(row)
411
412 - def addRow(self, row):
413 if self.validateRows: 414 try: 415 self.tableDef.validateRow(row) 416 except rscdef.IgnoreThisRow: 417 return 418 key = self.tableDef.getPrimaryIn(row) 419 if key in self.rowIndex: 420 return self.resolveConflict(row, key) 421 else: 422 self.rowIndex[key] = row 423 return InMemoryIndexedTable.addRow(self, row)
424