Package gavo :: Package rsc :: Module dbtable
[frames] | no frames]

Source Code for Module gavo.rsc.dbtable

  1  """ 
  2  Tables on disk 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import sys 
 12   
 13  from gavo import base 
 14  from gavo import rscdef 
 15  from gavo import utils 
 16  from gavo.base import sqlsupport 
 17  from gavo.rsc import common 
 18  from gavo.rsc import table 
 19   
 20   
21 -class _Feeder(table._Feeder):
22 """A context manager for feeding data into a table. 23 24 This feeder hands through batchSize items at a time to the database. 25 26 After an exit, the instances have an nAffected attribute that says 27 how many rows were processed by the database through this feeder. 28 29 A feeder is constructed with a parent table (that also provides 30 the connection), an insert command, and potentially some options. 31 32 Note that the table feeder does *not* do any connection management. 33 You have to commit or rollback yourself (or do it properly and go 34 through data, which can do connection management). 35 """
36 - def __init__(self, parent, insertCommand, batchSize=2000, notify=True):
37 self.nAffected, self.notify = 0, notify 38 table._Feeder.__init__(self, parent) 39 self.feedCommand, self.batchSize = insertCommand, batchSize 40 self.batchCache = []
41
42 - def shipout(self):
43 if self.batchCache: 44 try: 45 self.cursor.executemany(self.feedCommand, self.batchCache) 46 except sqlsupport.IntegrityError: 47 base.ui.notifyInfo("One or more of the following rows clashed: "+ 48 str(self.batchCache)) 49 raise 50 except sqlsupport.DataError: 51 base.ui.notifyInfo("Bad input. Run with -b1 to pin down offending" 52 " record. First rec: %s"%self.batchCache[0]) 53 raise 54 except sqlsupport.ProgrammingError: 55 raise 56 if self.cursor.rowcount>=0: 57 self.nAffected += self.cursor.rowcount 58 else: # can't guess how much was affected, let's assume all rows 59 self.nAffected += len(self.batchCache) # did something. 60 if self.notify: 61 base.ui.notifyShipout(len(self.batchCache)) 62 self.batchCache = []
63
64 - def add(self, data):
65 self._assertActive() 66 if self.table.validateRows: 67 try: 68 self.table.tableDef.validateRow(data) 69 except rscdef.IgnoreThisRow: 70 return 71 self.batchCache.append(data) 72 if len(self.batchCache)>=self.batchSize: 73 self.shipout()
74
75 - def flush(self):
76 self._assertActive() 77 self.shipout()
78
79 - def reset(self):
80 self._assertActive() 81 self.batchCache = []
82
83 - def __enter__(self):
84 self.cursor = self.table.connection.cursor() 85 return table._Feeder.__enter__(self)
86
87 - def __exit__(self, *args):
88 if not args or args[0] is None: # regular exit, ship out 89 try: 90 self.shipout() 91 # The following sucks, but rowcount seems to be always 1 on insert operations. 92 # However, we at least want a chance to catch update operations matching 93 # nothing. So, if rowcount is 0, it's a sign something went wrong, and 94 # we want to override our initial guess. 95 if self.cursor.rowcount==0: 96 self.nAffected = 0 97 self.cursor.close() 98 except: 99 del self.cursor 100 table._Feeder.__exit__(self, *sys.exc_info()) 101 raise 102 if hasattr(self, "cursor"): 103 del self.cursor 104 table._Feeder.__exit__(self, *args) 105 return False
106
107 - def getAffected(self):
108 return self.nAffected
109 110
111 -class _RaisingFeeder(_Feeder):
112 """is a feeder that will bomb on any attempt to feed data to it. 113 114 It is useful for tables that can't be written, specifically, views. 115 """
116 - def add(self, data):
117 raise base.DataError("Attempt to feed to a read-only table")
118 119
120 -class MetaTableMixin(object):
121 """is a mixin providing methods updating the dc_tables. 122 123 It requires a tableDef attribute on the parent, and the parent must 124 mix in QuerierMixin. 125 """ 126 __metaRDId = "__system__/dc_tables" 127
128 - def _cleanFromSourceTable(self):
129 """removes information about self.tableDef from the tablemeta table. 130 """ 131 self.connection.execute( 132 "DELETE FROM dc.tablemeta WHERE tableName=%(tableName)s", 133 {"tableName": self.tableDef.getQName()})
134
135 - def _addToSourceTable(self):
136 """adds information about self.tableDef to the tablemeta table. 137 """ 138 t = DBTable(base.caches.getRD( 139 self.__metaRDId).getTableDefById("tablemeta"), 140 connection=self.connection) 141 t.addRow({"tableName": self.tableDef.getQName(), 142 "sourceRD": self.tableDef.rd.sourceId, 143 "adql": self.tableDef.adql, 144 "tableDesc": base.getMetaText(self.tableDef, "description"), 145 "resDesc": base.getMetaText(self.tableDef.rd, "description"),})
146
147 - def addToMeta(self):
148 self.cleanFromMeta() # Don't force people to clean first on meta updates 149 self._addToSourceTable()
150
151 - def cleanFromMeta(self):
152 self._cleanFromSourceTable()
153 154
155 -class DBMethodsMixin(sqlsupport.QuerierMixin):
156 """is a mixin for on-disk tables. 157 158 The parent must have tableDef, tableName (from tabledef.getQName()), 159 and connection attributes. 160 """ 161 162 scripts = None # set by data on import, defined by make 163
164 - def _definePrimaryKey(self):
165 if self.tableDef.primary and not self.hasIndex(self.tableName, 166 self.getPrimaryIndexName(self.tableDef.id)): 167 if not self.tableDef.system: 168 base.ui.notifyIndexCreation("Primary key on %s"%self.tableName) 169 try: 170 self.connection.execute("ALTER TABLE %s ADD PRIMARY KEY (%s)"%( 171 self.tableName, ", ".join(self.tableDef.primary))) 172 except sqlsupport.DBError as msg: 173 raise base.ui.logOldExc( 174 common.DBTableError("Primary key %s could not be added (%s)"%( 175 self.tableDef.primary, repr(str(msg))), self.tableName, 176 hint="The howDoI documentation text may contain help on" 177 " how to find the offending elements."))
178
179 - def _dropPrimaryKey(self):
180 """drops a primary key if it exists. 181 182 *** Postgres specific *** 183 """ 184 constraintName = str(self.getPrimaryIndexName(self.tableDef.id)) 185 if self.tableDef.primary and self.hasIndex( 186 self.tableName, constraintName): 187 self.connection.execute("ALTER TABLE %s DROP CONSTRAINT %s"%( 188 self.tableName, constraintName))
189
190 - def _addForeignKeys(self):
191 """adds foreign key constraints if necessary. 192 """ 193 for fk in self.tableDef.foreignKeys: 194 if not self.tableDef.system: 195 base.ui.notifyIndexCreation( 196 self.tableDef.expand(fk.getDescription())) 197 fk.create(self)
198
199 - def _dropForeignKeys(self):
200 """drops foreign key constraints if necessary. 201 """ 202 for fk in self.tableDef.foreignKeys: 203 fk.delete(self)
204
205 - def dropIndices(self):
206 if not self.exists(): 207 return 208 self._dropForeignKeys() 209 # Don't drop the primary key for now -- foreign key relationships may 210 # depend on it, and postgres doesn't seem to mind if we just create it later. 211 # This is presumably still bad if someone changed the primary key. Let's 212 # worry about it then. 213 # self._dropPrimaryKey() 214 for index in reversed(self.tableDef.indices): 215 index.drop(self) 216 return self
217
218 - def makeIndices(self):
219 """creates all indices on the table, including any definition of 220 a primary key. 221 """ 222 self.connection.execute("SET maintenance_work_mem=%s000"% 223 base.getConfig("db", "indexworkmem")) 224 if self.suppressIndex or not self.exists(): 225 return 226 if self.tableDef.primary: 227 self._definePrimaryKey() 228 for index in self.tableDef.indices: 229 index.create(self) 230 self._addForeignKeys() 231 return self
232
233 - def getDeleteQuery(self, matchCondition, pars={}):
234 return "DELETE FROM %s WHERE %s"%( 235 self.tableName, matchCondition), pars
236
237 - def deleteMatching(self, matchCondition, pars={}):
238 """deletes all rows matching matchCondition. 239 240 For now, matchCondition a boolean SQL expression. All rows matching 241 it will be deleted. 242 """ 243 self.connection.execute(*self.getDeleteQuery(matchCondition, pars))
244
245 - def copyIn(self, inFile, binary=True):
246 fmt = " WITH BINARY" if binary else "" 247 cursor = self.connection.cursor() 248 cursor.copy_expert("COPY %s FROM STDIN %s"%(self.tableName, fmt), inFile) 249 cursor.close() 250 return self
251
252 - def copyOut(self, outFile, binary=True):
253 fmt = " WITH BINARY" if binary else "" 254 cursor = self.connection.cursor() 255 cursor.copy_expert("COPY %s TO STDOUT %s"%(self.tableName, fmt), outFile) 256 cursor.close() 257 return self
258
259 - def ensureSchema(self):
260 """creates self's schema if necessary. 261 """ 262 if self.tableDef.temporary: # these never are in a schema 263 return 264 schemaName = self.tableDef.rd.schema 265 if not self.schemaExists(schemaName): 266 self.connection.execute("CREATE SCHEMA %(schemaName)s"%locals()) 267 self.setSchemaPrivileges(self.tableDef.rd) 268 return self
269
270 - def ensureOnDiskMatches(self):
271 """raises a DataError if the on-disk structure of a table doesn't 272 match DaCHS' idea of it. 273 274 If the table doesn't exist on disk, this will raise a NotFoundError. 275 """ 276 dbCols = self.getColumnsFromDB(self.tableDef.getQName()) 277 mismatches = [] 278 if len(self.tableDef.columns)<len(dbCols): 279 mismatches.append("extra columns in DB (%s)"%", ".join( 280 name for name, _ in dbCols[len(self.tableDef.columns):])) 281 282 for index, col in enumerate(self.tableDef): 283 try: 284 name, type = dbCols[index] 285 except IndexError: 286 mismatches.append("from column %s on: No matches in DB"%col.name) 287 break 288 289 if col.name.lower()!=name: 290 mismatches.append("mismatching name of %s (DB: %s)"%(col.name, name)) 291 continue 292 try: 293 base.sqltypeToPgValidator(col.type)(type) 294 except TypeError as ex: 295 mismatches.append("type mismatch in column %s (%s)"%( 296 col.name, utils.safe_str(ex))) 297 298 if mismatches: 299 raise base.DataError("Table %s: %s"%( 300 self.tableDef.getQName(), "; ".join(mismatches)))
301 302
303 -class DBTable(DBMethodsMixin, table.BaseTable, MetaTableMixin):
304 """An interface to a table in the database. 305 306 These are usually created using ``api.TableForDef(tableDef)`` with a 307 table definition obtained, e.g., from an RD, saying ``onDisk=True``. 308 309 When constructing a DBTable, it will be created if necessary (unless 310 ``create=False`` is passed), but indices or primary keys keys will only be 311 created on a call to ``importFinished``. 312 313 The constructor does not check if the schema of the table on disk matches 314 the tableDef. If the two diverge, all kinds of failures are conceivable; 315 use ``dachs val -c`` to make sure on-disk structure match the RDs. 316 317 You can pass a ``nometa`` boolean kw argument to suppress entering the table 318 into the ``dc_tables`` table. 319 320 You can pass an exclusive boolean kw argument; if you do, the 321 ``iterQuery`` (and possibly similar methods in the future) method 322 will block concurrent writes to the selected rows ("FOR UPDATE") 323 as long as the transaction is active. 324 325 The main attributes (with API guarantees) include: 326 327 * tableDef -- the defining tableDef 328 * getFeeder() -- returns a function you can call with rowdicts to 329 insert them into the table. 330 * importFinished() -- must be called after you've fed all rows when 331 importing data. 332 * drop() -- drops the table in the database 333 * recreate() -- drops the table and generates a new empty one. 334 * getTableForQuery(...) -- returns a Table instance built from a query 335 over this table (you probably to use ``conn.query*`` and 336 ``td.getSimpleQuery`` instead). 337 """ 338 _runScripts = None # this is overridden by make (Yikes!) 339
340 - def __init__(self, tableDef, **kwargs):
341 self.connection = kwargs.pop("connection", None) 342 if self.connection is None: 343 raise base.ReportableError("DBTable built without connection.", 344 hint="In pre-1.0 DaCHS, database tables could automatically" 345 " open and manage connections. This turned out to be much" 346 " more trouble than it was worth. See develNotes for how" 347 " to do things today.") 348 349 self.suppressIndex = kwargs.pop("suppressIndex", False) 350 self.tableUpdates = kwargs.pop("tableUpdates", False) 351 self.exclusive = kwargs.pop("exclusive", False) 352 self.commitAfterMeta = kwargs.pop("commitAfterMeta", False) 353 table.BaseTable.__init__(self, tableDef, **kwargs) 354 355 if self.tableDef.rd is None and not self.tableDef.temporary: 356 raise base.ReportableError("TableDefs without resource descriptor" 357 " cannot be used to access database tables") 358 self.tableName = self.tableDef.getQName() 359 self.nometa = (kwargs.get("nometa", False) 360 or self.tableDef.temporary or tableDef.rd.schema=="dc") 361 362 self.newlyCreated = False 363 if kwargs.get("create", False): 364 self.createIfNecessary() 365 366 if self.tableUpdates: 367 self.addCommand = "UPDATE %s SET %s WHERE %s"%( 368 self.tableName, 369 ", ".join("%s=%%(%s)s"%(f.name, f.key) 370 for f in self.tableDef), 371 " AND ".join("%s=%%(%s)s"%(n, n) for n in self.tableDef.primary)) 372 else: 373 self.addCommand = ("INSERT INTO %s (%s) VALUES (%s)"%( 374 self.tableName, 375 ", ".join([str(c.name) for c in self.tableDef.columns]), 376 ", ".join(["%%(%s)s"%c.key for c in self.tableDef.columns]))) 377 378 if "rows" in kwargs: 379 self.feedRows(kwargs["rows"])
380
381 - def __iter__(self):
382 # Do we want named cursors by default here? 383 cursor = self.connection.cursor() 384 cursor.execute("SELECT * FROM %s"%self.tableName) 385 for row in cursor: 386 yield self.tableDef.makeRowFromTuple(row) 387 cursor.close()
388
389 - def __len__(self):
390 return list(self.query("select count(*) from %s"%self.getQName()))[0][0]
391
392 - def exists(self):
393 return self.getTableType(self.tableDef.getQName()) is not None
394
395 - def getFeeder(self, **kwargs):
396 if "notify" not in kwargs: 397 kwargs["notify"] = not self.tableDef.system or not self.tableDef.onDisk 398 return _Feeder(self, self.addCommand, **kwargs)
399
400 - def importFinished(self):
401 if self.newlyCreated: 402 self.runScripts("preIndex") 403 self.makeIndices() 404 self.runScripts("postCreation") 405 else: 406 base.ui.notifyDBTableModified(self.tableName) 407 408 self.connection.execute("ANALYZE %s"%self.tableName) 409 return self
410
411 - def importFailed(self, *excInfo):
412 # rollback is handled by the feeder. 413 return False
414
415 - def feedRows(self, rows):
416 """Feeds a sequence of rows to the table. 417 418 The method returns the number of rows affected. Exceptions are 419 handed through upstream, but the connection is rolled back. 420 """ 421 with self.getFeeder() as feeder: 422 for r in rows: 423 feeder.add(r) 424 return feeder.nAffected
425
426 - def addRow(self, row):
427 """adds a row to the table. 428 429 Use this only to add one or two rows, otherwise go for getFeeder. 430 """ 431 try: 432 self.connection.execute(self.addCommand, row) 433 except sqlsupport.IntegrityError: 434 raise base.ui.logOldExc( 435 base.ValidationError("Row %s cannot be added since it clashes" 436 " with an existing record on the primary key"%row, row=row, 437 colName="unknown"))
438
439 - def getRow(self, *key):
440 """returns the row with the primary key key from the table. 441 442 This will raise a DataError on tables without primaries. 443 """ 444 if not self.tableDef.primary: 445 raise base.DataError("Table %s has no primary key and thus does" 446 " not support getRow"%self.tableName) 447 res = list(self.iterQuery(self.tableDef, 448 " AND ".join("%s=%%(%s)s"%(n,n) for n in self.tableDef.primary), 449 pars=dict(zip(self.tableDef.primary, key)))) 450 if not res: 451 raise KeyError(key) 452 return res[0]
453
454 - def createUniquenessRules(self):
455 if not self.tableDef.forceUnique: 456 return 457 458 # TODO: Move the following lines to the top of the function once we're 459 # sure nobody uses pg <9.3 any more; that way, update policys 460 # will reliably be cleaned up. While about it, do away with the RULE-based 461 # ones and just work with TRIGGERs. 462 self.connection.execute('DROP TRIGGER IF EXISTS "dropOld_%s"' 463 ' ON %s'%(self.tableName, self.tableName)) 464 self.connection.execute('DROP RULE IF EXISTS updatePolicy' 465 ' ON %s'%(self.tableName)) 466 467 def getMatchCondition(): 468 return " AND ".join("%s=new.%s"%(n,n) for n in self.tableDef.primary)
469 470 # uniqueness checking becomes really slow if primary key definition 471 # is delayed until index creation (as usual), so let's do it now. 472 self._definePrimaryKey() 473 474 if self.tableDef.dupePolicy=="drop": 475 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS" 476 " ON INSERT TO %s WHERE" 477 " EXISTS(SELECT * FROM %s WHERE %s)" 478 " DO INSTEAD NOTHING"%(self.tableName, self.tableName, 479 getMatchCondition())) 480 481 elif self.tableDef.dupePolicy=="check": 482 # This one is tricky: if the inserted column is *different*, 483 # the rule does not fire and we get a pkey violation. 484 # Furthermore, special NULL handling is required -- we 485 # do not check columns that have NULLs in new or old. 486 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS" 487 " ON INSERT TO %s WHERE" 488 " EXISTS(SELECT 1 FROM %s WHERE %s)" 489 " DO INSTEAD NOTHING"%(self.tableName, self.tableName, 490 " AND ".join("(new.%s IS NULL OR %s IS NULL OR %s=new.%s)"%( 491 c.name, c.name, c.name,c.name) for c in self.tableDef))) 492 493 elif self.tableDef.dupePolicy=="dropOld": 494 args = { 495 "table": self.tableName, 496 "matchCond": getMatchCondition()} 497 self.connection.execute('CREATE OR REPLACE FUNCTION "dropOld_%(table)s"()' 498 ' RETURNS trigger AS $body$\n' 499 " BEGIN\n" 500 " IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN\n" 501 " DELETE FROM %(table)s WHERE %(matchCond)s;\n" 502 " END IF;\n" 503 " RETURN NEW;\nEND\n$body$ LANGUAGE plpgsql"%args) 504 self.connection.execute( 505 'CREATE TRIGGER "dropOld_%(table)s" BEFORE INSERT OR UPDATE' 506 ' ON %(table)s FOR EACH ROW EXECUTE PROCEDURE "dropOld_%(table)s"()'% 507 args) 508 509 elif self.tableDef.dupePolicy=="overwrite": 510 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS" 511 " ON INSERT TO %s WHERE" 512 " EXISTS(SELECT %s FROM %s WHERE %s)" 513 " DO INSTEAD UPDATE %s SET %s WHERE %s"%(self.tableName, 514 ",".join(self.tableDef.primary), 515 self.tableName, getMatchCondition(), 516 self.tableName, 517 ", ".join("%s=new.%s"%(c.name,c.name) for c in self.tableDef), 518 getMatchCondition())) 519 else: 520 raise base.DataError("Invalid dupePolicy: %s"%self.tableDef.dupePolicy)
521
522 - def setStatisticsTargets(self):
523 for col in self.tableDef: 524 if col.hasProperty("statisticsTarget"): 525 target = int(col.getProperty("statisticsTarget")) 526 self.connection.execute("ALTER TABLE %s ALTER COLUMN %s" 527 " SET STATISTICS %d"%(self.tableName, col.name, target))
528
529 - def configureTable(self):
530 self.updateMeta() 531 self.createUniquenessRules() 532 return self
533
534 - def create(self):
535 base.ui.notifyDebug("Create DB Table %s"%self.tableName) 536 self.ensureSchema() 537 self.runScripts("preCreation") 538 self.connection.execute(self.tableDef.getDDL()) 539 self.newlyCreated = True 540 return self.configureTable()
541
542 - def updateMeta(self):
543 if self.tableDef.temporary: 544 return 545 self.setTablePrivileges(self.tableDef) 546 self.setSchemaPrivileges(self.tableDef.rd) 547 self.createUniquenessRules() 548 self.setStatisticsTargets() 549 550 # Hack to support adding obscore using meta updates: 551 # execute a script to add us to the obscore sources table. 552 # XXX TODO: probably replace this with a script type metaUpdate 553 # once we have table scripts again. 554 if self.tableDef.hasProperty("obscoreClause"): 555 from gavo.rscdef import scripting 556 script = base.caches.getRD("//obscore").getById( 557 "addTableToObscoreSources") 558 scripting.PythonScriptRunner(script).run(self) 559 if not self.nometa: 560 self.addToMeta() 561 if self.commitAfterMeta: 562 self.connection.commit() 563 return self
564
565 - def createIfNecessary(self):
566 if not self.exists(): 567 self.create() 568 return self
569
570 - def drop(self):
571 """drops the table. 572 """ 573 if self.exists(): 574 self.runScripts("beforeDrop") 575 self.dropTable(self.tableDef.getQName(), cascade=True) 576 if not self.nometa: 577 self.cleanFromMeta() 578 return self
579
580 - def recreate(self):
581 self.drop() 582 self.create() 583 return self
584
585 - def query(self, query, data={}):
586 """runs query within this table's connection. 587 588 query is macro-expanded within the table definition (i.e., you can, 589 e.g., write \qName to obtain the table's qualified name). 590 """ 591 if "\\" in query: 592 query = self.tableDef.expand(query) 593 return DBMethodsMixin.query(self, query, data)
594
595 - def getSelectClause(self, resultTableDef):
596 """returns the select clause to come up with resultTableDef. 597 """ 598 parts = [] 599 for of in resultTableDef: 600 select = getattr(of, "select", None) 601 if select: 602 parts.append("%s AS %s"%(select, of.name)) 603 else: 604 parts.append(of.name) 605 return ", ".join(parts)
606
607 - def getQuery(self, resultTableDef, fragment, pars=None, 608 distinct=False, limits=None, groupBy=None):
609 """returns a result table definition, query string and a parameters 610 dictionary for a query against this table. 611 612 See getTableForQuery for the meaning of the arguments. 613 """ 614 if pars is None: 615 pars = {} 616 617 if not isinstance(resultTableDef, rscdef.TableDef): 618 resultTableDef = base.makeStruct(rscdef.TableDef, 619 id="iterQuery", columns=resultTableDef) 620 621 query = ["SELECT "] 622 if distinct: 623 query.append("DISTINCT ") 624 query.append(self.getSelectClause(resultTableDef)+" ") 625 query.append("FROM %s "%self.tableName) 626 627 if fragment and fragment.strip(): 628 query.append("WHERE %s "%fragment) 629 if groupBy: 630 query.append("GROUP BY %s "%groupBy) 631 if limits: 632 query.append(limits[0]+" ") 633 pars.update(limits[1]) 634 if self.exclusive: 635 query.append("FOR UPDATE ") 636 return resultTableDef, "".join(query), pars
637
638 - def iterQuery(self, resultTableDef=None, fragment="", pars=None, 639 distinct=False, limits=None, groupBy=None):
640 """like getTableForQuery, except that an iterator over the 641 result rows is returned. 642 643 (there is no advantage in using this as we will pull the entire 644 thing in memory anyway; use qtables if you need streaming). 645 """ 646 for row in self.getTableForQuery(resultTableDef, fragment, 647 pars, distinct, limits, groupBy).rows: 648 yield row
649
650 - def getTableForQuery(self, resultTableDef=None, fragment="", pars=None, 651 distinct=False, limits=None, groupBy=None):
652 """returns a Table instance for a query on this table. 653 654 resultTableDef is a TableDef with svc.OutputField columns 655 (rscdef.Column instances will do), or possibly just a list 656 of Columns. Fragment is empty or an SQL 657 where-clause with 658 dictionary placeholders, pars is the dictionary filling 659 fragment, distinct, if True, adds a distinct clause, 660 and limits, if given, is a pair of an SQL string to be 661 appended to the SELECT clause and parameters filling it. 662 queryMeta.asSQL returns what you need here. 663 664 pars may be mutated in the process. 665 """ 666 if resultTableDef is None: 667 resultTableDef = self.tableDef.copy(None) 668 resultTableDef, query, pars = self.getQuery( 669 resultTableDef, 670 fragment, 671 pars=pars, 672 distinct=distinct, 673 limits=limits, 674 groupBy=groupBy) 675 return table.InMemoryTable(resultTableDef, 676 rows=[resultTableDef.makeRowFromTuple(tupRow) 677 for tupRow in self.query(query, pars)])
678 679
680 -class View(DBTable):
681 """is a view, i.e., a table in the database you can't add to. 682 683 Strictly, I should derive both View and DBTable from a common 684 base, but that's currently not worth the effort. 685 686 Technically, Views are DBTables with a non-None viewStatement 687 (this is what TableForDef checks for when deciding whether to 688 construct a DBTable or a View). You can get a feeder for them, 689 but trying to actually feed anything will raise a DataError. 690 691 On import, views only run postCreation scripts; 692 since there are no indices, no preIndex scripts are not run, since 693 no import takes place, there's no preImport or newSource. 694 """ 695
696 - def __init__(self, *args, **kwargs):
697 DBTable.__init__(self, *args, **kwargs) 698 del self.addCommand
699
700 - def addRow(self, row):
701 raise base.DataError("You cannot add data to views")
702 703 feedRows = addRow 704
705 - def setStatisticsTargets(self):
706 # no statistics on views 707 # (we don't even warn since we've probably got this from the table 708 # where it's going to happen) 709 pass
710
711 - def getFeeder(self, **kwargs):
712 # all kwargs ignored since the feeder will raise an exception on any 713 # attempts to feed anyway. 714 return _RaisingFeeder(self, None)
715
716 - def create(self):
717 base.ui.notifyDebug("Create DB View %s"%self.tableName) 718 self.ensureSchema() 719 try: 720 self.connection.execute( 721 self.tableDef.expand(self.tableDef.viewStatement)) 722 return self.configureTable() 723 except base.DBError as msg: 724 raise base.ReportableError("View statement of table at %s bad." 725 " Postgres error message: %s"%( 726 self.tableDef.getSourcePosition(), 727 msg))
728
729 - def makeIndices(self):
730 return self # no indices or primary keys on views.
731
732 - def importFinished(self):
733 # don't do anything but run postCreation scripts 734 self.runScripts("postCreation") 735 return self
736