Source code for gavo.rsc.dbtable

"""
Tables on disk
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import sys

from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.base import sqlsupport
from gavo.rsc import common
from gavo.rsc import table


class _Feeder(table._Feeder):
	"""A context manager for feeding data into a table.

	This feeder hands through batchSize items at a time to the database.

	After an exit, the instances have an nAffected attribute that says
	how many rows were processed by the database through this feeder.

	A feeder is constructed with a parent table (that also provides
	the connection), an insert command, and potentially some options.

	Note that the table feeder does *not* do any connection management.
	You have to commit or rollback yourself (or do it properly and go
	through data, which can do connection management).
	"""
	def __init__(self, parent, insertCommand, batchSize=2000, notify=True):
		self.nAffected, self.notify = 0, notify
		table._Feeder.__init__(self, parent)
		self.feedCommand, self.batchSize = insertCommand, batchSize
		self.batchCache = []

	def shipout(self):
		if self.batchCache:
			try:
				self.cursor.executemany(self.feedCommand, self.batchCache)
			except sqlsupport.IntegrityError:
				base.ui.notifyInfo("One or more of the following rows clashed: "+
					str(self.batchCache))
				raise
			except sqlsupport.DataError:
				base.ui.notifyInfo("Bad input.  Run with -b1 to pin down offending"
					" record.  First rec: %s"%self.batchCache[0])
				raise

			self.nAffected += self.cursor.rowcount
			if self.notify:
				base.ui.notifyShipout(len(self.batchCache))
			self.batchCache = []

	def add(self, data):
		self._assertActive()
		if self.table.validateRows:
			try:
				self.table.tableDef.validateRow(data)
			except rscdef.IgnoreThisRow:
				return
		self.batchCache.append(data)
		if len(self.batchCache)>=self.batchSize:
			self.shipout()

	def flush(self):
		self._assertActive()
		self.shipout()
	
	def reset(self):
		self._assertActive()
		self.batchCache = []

	def __enter__(self):
		self.cursor = self.table.connection.cursor()
		return table._Feeder.__enter__(self)

	def __exit__(self, *args):
		if not args or args[0] is None: # regular exit, ship out
			try:
				self.shipout()
# The following sucks, but rowcount seems to be always 1 on insert operations.
# However, we at least want a chance to catch update operations matching
# nothing.  So, if rowcount is 0, it's a sign something went wrong, and
# we want to override our initial guess.
				if self.cursor.rowcount==0:
					self.nAffected = 0
				self.cursor.close()
			except:
				del self.cursor
				table._Feeder.__exit__(self, *sys.exc_info())
				raise
		if hasattr(self, "cursor"):
			del self.cursor
		table._Feeder.__exit__(self, *args)
		return False

	def getAffected(self):
		return self.nAffected


class _RaisingFeeder(_Feeder):
	"""is a feeder that will bomb on any attempt to feed data to it.

	It is useful for tables that can't be written, specifically, views.
	"""
	def add(self, data):
		raise base.DataError("Attempt to feed to a read-only table")


[docs]class MetaTableMixin(object): """is a mixin providing methods updating the dc_tables. It requires a tableDef attribute on the parent, and the parent must mix in QuerierMixin. """ __metaRDId = "__system__/dc_tables" def _cleanFromSourceTable(self): """removes information about self.tableDef from the tablemeta table. """ self.connection.execute( "DELETE FROM dc.tablemeta WHERE tableName=%(tableName)s", {"tableName": self.tableDef.getQName()}) def _addToSourceTable(self): """adds information about self.tableDef to the tablemeta table. """ t = DBTable(base.caches.getRD( self.__metaRDId).getTableDefById("tablemeta"), connection=self.connection) t.addRow({"tableName": self.tableDef.getQName(), "sourceRD": self.tableDef.rd.sourceId, "adql": bool(self.tableDef.adql), "tableDesc": base.getMetaText(self.tableDef, "description", propagate=False), "resDesc": base.getMetaText(self.tableDef.rd, "description", propagate=False), "nrows": self.tableDef.nrows})
[docs] def addToMeta(self): self._addToSourceTable()
[docs] def cleanFromMeta(self): self._cleanFromSourceTable()
[docs]class DBMethodsMixin(sqlsupport.QuerierMixin): """is a mixin for on-disk tables. The parent must have tableDef, tableName (from tabledef.getQName()), and connection attributes. """ scripts = None # set by data on import, defined by make def _definePrimaryKey(self): if self.tableDef.primary and not self.hasIndex(self.tableName, self.getPrimaryIndexName(self.tableDef.id)): if not self.tableDef.system: base.ui.notifyIndexCreation("Primary key on %s"%self.tableName) try: self.connection.execute("ALTER TABLE %s ADD PRIMARY KEY (%s)"%( self.tableName, ", ".join(self.tableDef.primary))) except sqlsupport.DBError as msg: raise base.ui.logOldExc( common.DBTableError("Primary key %s could not be added (%s)"%( self.tableDef.primary, repr(str(msg))), self.tableName, hint="The howDoI documentation text may contain help on" " how to find the offending elements.")) def _dropPrimaryKey(self): """drops a primary key if it exists. *** Postgres specific *** """ constraintName = str(self.getPrimaryIndexName(self.tableDef.id)) if self.tableDef.primary and self.hasIndex( self.tableName, constraintName): self.connection.execute("ALTER TABLE %s DROP CONSTRAINT %s"%( self.tableName, constraintName)) def _addForeignKeys(self): """adds foreign key constraints if necessary. """ for fk in self.tableDef.foreignKeys: if not self.tableDef.system: base.ui.notifyIndexCreation( self.tableDef.expand(fk.getDescription())) fk.create(self) def _dropForeignKeys(self): """drops foreign key constraints if necessary. """ for fk in self.tableDef.foreignKeys: fk.delete(self)
[docs] def dropIndices(self): if not self.exists(): return self._dropForeignKeys() # Don't drop the primary key for now -- foreign key relationships may # depend on it, and postgres doesn't seem to mind if we just create it later. # This is presumably still bad if someone changed the primary key. Let's # worry about it then. # self._dropPrimaryKey() for index in reversed(self.tableDef.indices): index.drop(self) return self
[docs] def makeIndices(self): """creates all indices on the table, including any definition of a primary key. """ self.connection.execute("SET maintenance_work_mem=%s000"% base.getConfig("db", "indexworkmem")) if self.suppressIndex or not self.exists(): return if self.tableDef.primary: self._definePrimaryKey() for index in self.tableDef.indices: index.create(self) self._addForeignKeys() return self
[docs] def getDeleteQuery(self, matchCondition, pars={}): return "DELETE FROM %s WHERE %s"%( self.tableName, matchCondition), pars
[docs] def deleteMatching(self, matchCondition, pars={}): """deletes all rows matching matchCondition. For now, matchCondition a boolean SQL expression. All rows matching it will be deleted. """ self.connection.execute(*self.getDeleteQuery(matchCondition, pars))
[docs] def copyIn(self, inFile, binary=True): fmt = " WITH BINARY" if binary else "" cursor = self.connection.cursor() cursor.copy_expert("COPY %s FROM STDIN %s"%(self.tableName, fmt), inFile) cursor.close() return self
[docs] def copyOut(self, outFile, binary=True): fmt = " WITH BINARY" if binary else "" cursor = self.connection.cursor() cursor.copy_expert("COPY %s TO STDOUT %s"%(self.tableName, fmt), outFile) cursor.close() return self
[docs] def ensureSchema(self): """creates self's schema if necessary. """ if self.tableDef.temporary: # these never are in a schema return schemaName = self.tableDef.rd.schema if not self.schemaExists(schemaName): self.connection.execute("CREATE SCHEMA %(schemaName)s"%locals()) self.setSchemaPrivileges(self.tableDef.rd) return self
[docs] def ensureOnDiskMatches(self): """raises a DataError if the on-disk structure of a table doesn't match DaCHS' idea of it. If the table doesn't exist on disk, this will raise a NotFoundError. """ dbCols = self.getColumnsFromDB(self.tableDef.getQName()) mismatches = [] if len(self.tableDef.columns)<len(dbCols): mismatches.append("extra columns in DB (%s)"%", ".join( name for name, _ in dbCols[len(self.tableDef.columns):])) for index, col in enumerate(self.tableDef): try: name, type = dbCols[index] except IndexError: mismatches.append("from column %s on: No matches in DB"%col.name) break if isinstance(col.name, utils.QuotedName): if col.name.name!=name: mismatches.append("mismatching delimited name of %s (DB: %s)"%( col.name, name)) elif col.name.lower()!=name: mismatches.append("mismatching name of %s (DB: %s)"%(col.name, name)) continue try: base.sqltypeToPgValidator(col.type)(type) except TypeError as ex: mismatches.append("type mismatch in column %s (%s)"%( col.name, utils.safe_str(ex))) if mismatches: raise base.DataError("Table %s: %s"%( self.tableDef.getQName(), "; ".join(mismatches)))
[docs]class DBTable(DBMethodsMixin, table.BaseTable, MetaTableMixin): """An interface to a table in the database. These are usually created using ``api.TableForDef(tableDef)`` with a table definition obtained, e.g., from an RD, saying ``onDisk=True``. When constructing a DBTable, it will be created if necessary (unless ``create=False`` is passed), but indices or primary keys keys will only be created on a call to ``importFinished``. The constructor does not check if the schema of the table on disk matches the tableDef. If the two diverge, all kinds of failures are conceivable; use ``dachs val -c`` to make sure on-disk structure match the RDs. You can pass a ``nometa`` boolean kw argument to suppress entering the table into the ``dc_tables`` table. You can pass an exclusive boolean kw argument; if you do, the ``iterQuery`` (and possibly similar methods in the future) method will block concurrent writes to the selected rows ("FOR UPDATE") as long as the transaction is active. DbTables will run preCreation, preIndex, postCreation, and beforeDrop scripts, both from the table definition and the make they are being created from. No scripts except beforeDrop are run when an existing table is operated on from an updating dd. The main attributes (with API guarantees) include: * tableDef -- the defining tableDef * getFeeder() -- returns a function you can call with rowdicts to insert them into the table. * importFinished(nImported) -- must be called after you've fed all rows when importing data; pass the number of rows fed in. * drop() -- drops the table in the database * recreate() -- drops the table and generates a new empty one. * getTableForQuery(...) -- returns a Table instance built from a query over this table (you probably to use ``conn.query*`` and ``td.getSimpleQuery`` instead). """ def __init__(self, tableDef, **kwargs): self.connection = kwargs.pop("connection", None) if self.connection is None: raise base.ReportableError("DBTable built without connection.", hint="In pre-1.0 DaCHS, database tables could automatically" " open and manage connections. This turned out to be much" " more trouble than it was worth. See develNotes for how" " to do things today.") self.suppressIndex = kwargs.pop("suppressIndex", False) self.tableUpdates = kwargs.pop("tableUpdates", False) self.exclusive = kwargs.pop("exclusive", False) self.updating = kwargs.pop("updating", False) self.commitAfterMeta = kwargs.pop("commitAfterMeta", False) table.BaseTable.__init__(self, tableDef, **kwargs) if self.tableDef.rd is None and not self.tableDef.temporary: raise base.ReportableError("TableDefs without resource descriptor" " cannot be used to access database tables") self.tableName = self.tableDef.getQName() self.nometa = (kwargs.get("nometa", False) or self.tableDef.temporary or tableDef.rd.schema=="dc") self.newlyCreated = False if kwargs.get("create", False): self.createIfNecessary() if self.tableUpdates: self.addCommand = "UPDATE %s SET %s WHERE %s"%( self.tableName, ", ".join("%s=%%(%s)s"%(f.name, f.key) for f in self.tableDef), " AND ".join("%s=%%(%s)s"%(n, n) for n in self.tableDef.primary)) else: self.addCommand = ("INSERT INTO %s (%s) VALUES (%s)"%( self.tableName, ", ".join([str(c.name) for c in self.tableDef.columns]), ", ".join(["%%(%s)s"%c.key for c in self.tableDef.columns]))) if "rows" in kwargs: self.feedRows(kwargs["rows"]) def __iter__(self): # Do we want named cursors by default here? cursor = self.connection.cursor() cursor.execute("SELECT * FROM %s"%self.tableName) for row in cursor: yield self.tableDef.makeRowFromTuple(row) cursor.close() def __len__(self): return list(self.connection.query("select count(*) from %s"% self.tableDef.getQName()))[0][0]
[docs] def exists(self): return self.getTableType(self.tableDef.getQName()) is not None
[docs] def getFeeder(self, **kwargs): if "notify" not in kwargs: kwargs["notify"] = not self.tableDef.system or not self.tableDef.onDisk return _Feeder(self, self.addCommand, **kwargs)
[docs] def importFinished(self, nAffected): if self.newlyCreated: self.runScripts("preIndex") self.makeIndices() self.runScripts("postCreation") else: base.ui.notifyDBTableModified(self.tableName) if nAffected>100: # don't run analyze when there's just a few records changed, # as when uploading to products. self.connection.execute("ANALYZE %s"%self.tableName) return self
[docs] def importFailed(self, *excInfo): # rollback is handled by the feeder. return False
[docs] def feedRows(self, rows): """Feeds a sequence of rows to the table. The method returns the number of rows affected. Exceptions are handed through upstream, but the connection is rolled back. """ with self.getFeeder() as feeder: for r in rows: feeder.add(r) return feeder.nAffected
[docs] def addRow(self, row): """adds a row to the table. Use this only to add one or two rows, otherwise go for getFeeder. """ try: self.connection.execute(self.addCommand, row) except sqlsupport.IntegrityError: raise base.ui.logOldExc( base.ValidationError("Row %s cannot be added since it clashes" " with an existing record on the primary key"%row, row=row, colName="unknown"))
[docs] def getRow(self, *key): """returns the row with the primary key key from the table. This will raise a DataError on tables without primaries. """ if not self.tableDef.primary: raise base.DataError("Table %s has no primary key and thus does" " not support getRow"%self.tableName) res = list(self.iterQuery(self.tableDef, " AND ".join("%s=%%(%s)s"%(n,n) for n in self.tableDef.primary), pars=dict(list(zip(self.tableDef.primary, key))))) if not res: raise KeyError(key) return res[0]
[docs] def createUniquenessRules(self): # these two drop rules and triggers created by old DaCHS # versions. I'd say they can be removed ~ 2025, when all # tables with such triggers/rules have beeen removed. self.connection.execute('DROP TRIGGER IF EXISTS "dropOld_%s"' ' ON %s'%(self.tableName, self.tableName)) self.connection.execute('DROP RULE IF EXISTS updatePolicy' ' ON %s'%(self.tableName)) # this is the new trigger used by all policies self.connection.execute('DROP TRIGGER IF EXISTS "dupe_policy_%s"' ' ON %s'%(self.tableName, self.tableName)) if not self.tableDef.dupePolicy: return def getMatchCondition(): return " AND ".join("%s=new.%s"%(n,n) for n in self.tableDef.primary) # uniqueness checking becomes really slow if primary key definition # is delayed until index creation (as usual), so let's do it now. self._definePrimaryKey() args = { "table": self.tableName, "matchCond": getMatchCondition(), "colNames": ",".join(str(c.name) for c in self.tableDef), "mungedColNames": ",".join( "munged_vals."+str(c.name) for c in self.tableDef)} if self.tableDef.dupePolicy=="drop": self.connection.execute(""" CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"() RETURNS trigger AS $body$ BEGIN IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN RETURN NULL; END IF; RETURN NEW; END $body$ LANGUAGE plpgsql"""%args) elif self.tableDef.dupePolicy=="check": # This one is tricky: if the inserted column is *different*, # the rule does not fire and we get a pkey violation. # Furthermore, special NULL handling is required -- we # do not check columns that have NULLs in new or old. self.connection.execute(""" CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"() RETURNS trigger AS $body$ BEGIN SELECT * INTO OLD FROM %(table)s WHERE %(matchCond)s; IF NOT FOUND THEN RETURN NEW; ELSEIF (OLD IS NOT DISTINCT FROM NEW) THEN RETURN NULL; ELSE RAISE 'Overwrite attempt with different tuple with check policy %%%% %%%%.', OLD, NEW; END IF; RETURN NEW; END $body$ LANGUAGE plpgsql"""%args) elif self.tableDef.dupePolicy=="dropOld": self.connection.execute(""" CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"() RETURNS trigger AS $body$ BEGIN IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN DELETE FROM %(table)s WHERE %(matchCond)s; END IF; RETURN NEW; END $body$ LANGUAGE plpgsql"""%args) elif self.tableDef.dupePolicy=="overwrite": self.connection.execute(""" CREATE OR REPLACE FUNCTION "handle_dupe_%(table)s"() RETURNS trigger AS $body$ DECLARE munged_vals RECORD; BEGIN munged_vals = NEW; IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN UPDATE %(table)s SET (%(colNames)s)=(%(mungedColNames)s) WHERE %(matchCond)s; RETURN NULL; END IF; RETURN NEW; END $body$ LANGUAGE plpgsql"""%args) else: raise base.DataError("Invalid dupePolicy: %s"%self.tableDef.dupePolicy) self.connection.execute( 'CREATE TRIGGER "dupe_policy_%(table)s" BEFORE INSERT' ' ON %(table)s FOR EACH ROW EXECUTE' ' PROCEDURE "handle_dupe_%(table)s"()'%args)
[docs] def setStatisticsTargets(self): for col in self.tableDef: if col.hasProperty("statisticsTarget"): target = int(col.getProperty("statisticsTarget")) self.connection.execute("ALTER TABLE %s ALTER COLUMN %s" " SET STATISTICS %d"%(self.tableName, col.name, target))
[docs] def create(self): base.ui.notifyDebug("Create DB Table %s"%self.tableName) self.ensureSchema() self.newlyCreated = True self.runScripts("preCreation") self.connection.execute(self.tableDef.getDDL()) return self.updateMeta()
[docs] def updateMeta(self): if self.tableDef.temporary: return self.setTablePrivileges(self.tableDef) self.setSchemaPrivileges(self.tableDef.rd) self.createUniquenessRules() self.setStatisticsTargets() if not self.nometa: self.addToMeta() self.runScripts("afterMeta") if self.commitAfterMeta: self.connection.commit() return self
[docs] def createIfNecessary(self): if not self.exists(): self.create() return self
def _iterDerivedViews(self): """yields view that contain the current table. This is in particular used when dropping the table (rather than rely on cascade) in order to give view's dropping scripts a chance to run. """ depNames = set(r[0] for r in self.connection.query(""" SELECT DISTINCT v.oid::regclass as table_name FROM pg_depend AS d JOIN pg_rewrite AS r ON r.oid = d.objid JOIN pg_class AS v ON v.oid = r.ev_class WHERE v.relkind = 'v' AND d.classid = 'pg_rewrite'::regclass AND d.refclassid = 'pg_class'::regclass AND d.deptype = 'n' AND refobjid=%(qname)s::regclass""", {"qname": self.tableDef.getQName()})) if not depNames: return myId = self.tableDef.getFullId() for depId in [r[1]+"#"+r[0].split(".")[-1] for r in self.connection.query("""SELECT tablename, sourcerd FROM dc.tablemeta WHERE tablename IN %(depNames)s""", locals())]: if depId==myId: # can't be bothered to figure out why a view apparently # depends on itself by the logic above continue try: yield base.resolveCrossId(depId) except base.NotFoundError: base.ui.notifyError("Would have liked to drop %s before" " dropping %s, but found its definition gone."%( depId, self.tableDef.getQName()))
[docs] def drop(self, seenIds=()): """drops the table. This will recurse into dependent objects. """ self.runScripts("beforeDrop") if self.exists(): if seenIds==(): seenIds = set() # if there are any dependent views (known), drop them in a controlled # fashion becore CASCADE hits for viewDef in self._iterDerivedViews(): if id(viewDef) in seenIds: continue seenIds.add(id(viewDef)) View( viewDef, connection=self.connection, create=False).drop(seenIds) self.dropTable(self.tableDef.getQName(), cascade=True) if not self.nometa: self.cleanFromMeta() return self
[docs] def recreate(self): self.drop() self.create() return self
[docs] def query(self, query, data={}): """runs query within this table's connection. query is macro-expanded within the table definition (i.e., you can, e.g., write \qName to obtain the table's qualified name). Don't use this in new code; use t.connection.query or execute as required. """ return DBMethodsMixin.query(self, self.expand(query), data)
[docs] def getSelectClause(self, resultTableDef): """returns the select clause to come up with resultTableDef. """ parts = [] for of in resultTableDef: select = getattr(of, "select", None) if select: parts.append("%s AS %s"%(select, of.name)) else: parts.append(of.name) return ", ".join(parts)
[docs] def getQuery(self, resultTableDef, fragment, pars=None, distinct=False, limits=None, groupBy=None, samplePercent=None): """returns a result table definition, query string and a parameters dictionary for a query against this table. See getTableForQuery for the meaning of the arguments. """ if pars is None: pars = {} if not isinstance(resultTableDef, rscdef.TableDef): resultTableDef = base.makeStruct(rscdef.TableDef, id="iterQuery", columns=resultTableDef) query = ["SELECT "] if distinct: query.append("DISTINCT ") query.append(self.getSelectClause(resultTableDef)+" ") query.append("FROM %s "%self.tableName) if samplePercent: query.append("TABLESAMPLE SYSTEM (%f)"%samplePercent) if fragment and fragment.strip(): query.append("WHERE %s "%fragment) if groupBy: query.append("GROUP BY %s "%groupBy) if limits: query.append(limits[0]+" ") pars.update(limits[1]) if self.exclusive: query.append("FOR UPDATE ") return resultTableDef, "".join(query), pars
[docs] def iterQuery(self, resultTableDef=None, fragment="", pars=None, distinct=False, limits=None, groupBy=None): """like getTableForQuery, except that an iterator over the result rows is returned. (there is no advantage in using this as we will pull the entire thing in memory anyway; use qtables if you need streaming). """ for row in self.getTableForQuery(resultTableDef, fragment, pars, distinct, limits, groupBy).rows: yield row
[docs] def getTableForQuery(self, resultTableDef=None, fragment="", pars=None, distinct=False, limits=None, groupBy=None, samplePercent=None): """returns a Table instance for a query on this table. resultTableDef is a TableDef with svc.OutputField columns (rscdef.Column instances will do), or possibly just a list of Columns. Fragment is empty or an SQL where-clause with dictionary placeholders, pars is the dictionary filling fragment, distinct, if True, adds a distinct clause, and limits, if given, is a pair of an SQL string to be appended to the SELECT clause and parameters filling it. queryMeta.asSQL returns what you need here. pars may be mutated in the process. """ if resultTableDef is None: resultTableDef = self.tableDef.copy(None) resultTableDef, query, pars = self.getQuery( resultTableDef, fragment, pars=pars, distinct=distinct, limits=limits, groupBy=groupBy, samplePercent=samplePercent) return table.InMemoryTable(resultTableDef, rows=[resultTableDef.makeRowFromTuple(tupRow) for tupRow in self.connection.query(query, pars)])
[docs] def runScripts(self, phase, **kwargs): """runs scripts from both the tableDef and the make. The reason there's not a single place is mainly historical; on the other hand, one day postCreation scripts only run when a table is created in a special way might come in handy, so I'll not take away table scripts from makes. """ if self.updating: if (not self.newlyCreated and phase in {"preImport", "preIndex", "postCreation"}): return self.tableDef.getRunner()(self, phase, **kwargs) if self.makeRunning: self.makeRunning.getRunner()(self, phase, **kwargs)
[docs]class View(DBTable): """is a view, i.e., a table in the database you can't add to. Strictly, I should derive both View and DBTable from a common base, but that's currently not worth the effort. Technically, Views are DBTables with a non-None viewStatement (this is what TableForDef checks for when deciding whether to construct a DBTable or a View). You can get a feeder for them, but trying to actually feed anything will raise a DataError. On import, views only run postCreation scripts; we assume everything else (preIndex, postIndex, preImport, newSource, etc) has run in the contributing tables. Materialised views *will* create indices, however. """ def __init__(self, *args, **kwargs): DBTable.__init__(self, *args, **kwargs) del self.addCommand
[docs] def addRow(self, row): raise base.DataError("You cannot add data to views")
feedRows = addRow
[docs] def setStatisticsTargets(self): # no statistics on views # (we don't even warn since we've probably got this from the table # where it's going to happen) pass
[docs] def getFeeder(self, **kwargs): # all kwargs ignored since the feeder will raise an exception on any # attempts to feed anyway. return _RaisingFeeder(self, None)
[docs] def create(self): base.ui.notifyDebug("Create DB View %s"%self.tableName) self.newlyCreated = True self.ensureSchema() try: self.connection.execute( self.tableDef.expand(self.tableDef.viewStatement)) return self.updateMeta() except base.DBError as msg: raise base.ReportableError("View statement of table at %s bad." " Postgres error message: %s"%( self.tableDef.getSourcePosition(), msg))
[docs] def makeIndices(self): if self.getTableType(self.tableDef.getQName())=="MATERIALIZED VIEW": super().makeIndices()
[docs] def importFinished(self, nImported): self.makeIndices() self.runScripts("postCreation") return self