1 """
2 Tables on disk
3 """
4
5
6
7
8
9
10
11 import sys
12
13 from gavo import base
14 from gavo import rscdef
15 from gavo import utils
16 from gavo.base import sqlsupport
17 from gavo.rsc import common
18 from gavo.rsc import table
19
20
22 """A context manager for feeding data into a table.
23
24 This feeder hands through batchSize items at a time to the database.
25
26 After an exit, the instances have an nAffected attribute that says
27 how many rows were processed by the database through this feeder.
28
29 A feeder is constructed with a parent table (that also provides
30 the connection), an insert command, and potentially some options.
31
32 Note that the table feeder does *not* do any connection management.
33 You have to commit or rollback yourself (or do it properly and go
34 through data, which can do connection management).
35 """
36 - def __init__(self, parent, insertCommand, batchSize=2000, notify=True):
37 self.nAffected, self.notify = 0, notify
38 table._Feeder.__init__(self, parent)
39 self.feedCommand, self.batchSize = insertCommand, batchSize
40 self.batchCache = []
41
43 if self.batchCache:
44 try:
45 self.cursor.executemany(self.feedCommand, self.batchCache)
46 except sqlsupport.IntegrityError:
47 base.ui.notifyInfo("One or more of the following rows clashed: "+
48 str(self.batchCache))
49 raise
50 except sqlsupport.DataError:
51 base.ui.notifyInfo("Bad input. Run with -b1 to pin down offending"
52 " record. First rec: %s"%self.batchCache[0])
53 raise
54 except sqlsupport.ProgrammingError:
55 raise
56 if self.cursor.rowcount>=0:
57 self.nAffected += self.cursor.rowcount
58 else:
59 self.nAffected += len(self.batchCache)
60 if self.notify:
61 base.ui.notifyShipout(len(self.batchCache))
62 self.batchCache = []
63
64 - def add(self, data):
74
76 self._assertActive()
77 self.shipout()
78
80 self._assertActive()
81 self.batchCache = []
82
86
88 if not args or args[0] is None:
89 try:
90 self.shipout()
91
92
93
94
95 if self.cursor.rowcount==0:
96 self.nAffected = 0
97 self.cursor.close()
98 except:
99 del self.cursor
100 table._Feeder.__exit__(self, *sys.exc_info())
101 raise
102 if hasattr(self, "cursor"):
103 del self.cursor
104 table._Feeder.__exit__(self, *args)
105 return False
106
108 return self.nAffected
109
110
112 """is a feeder that will bomb on any attempt to feed data to it.
113
114 It is useful for tables that can't be written, specifically, views.
115 """
116 - def add(self, data):
118
119
153
154
156 """is a mixin for on-disk tables.
157
158 The parent must have tableDef, tableName (from tabledef.getQName()),
159 and connection attributes.
160 """
161
162 scripts = None
163
178
180 """drops a primary key if it exists.
181
182 *** Postgres specific ***
183 """
184 constraintName = str(self.getPrimaryIndexName(self.tableDef.id))
185 if self.tableDef.primary and self.hasIndex(
186 self.tableName, constraintName):
187 self.connection.execute("ALTER TABLE %s DROP CONSTRAINT %s"%(
188 self.tableName, constraintName))
189
198
200 """drops foreign key constraints if necessary.
201 """
202 for fk in self.tableDef.foreignKeys:
203 fk.delete(self)
204
206 if not self.exists():
207 return
208 self._dropForeignKeys()
209
210
211
212
213
214 for index in reversed(self.tableDef.indices):
215 index.drop(self)
216 return self
217
219 """creates all indices on the table, including any definition of
220 a primary key.
221 """
222 self.connection.execute("SET maintenance_work_mem=%s000"%
223 base.getConfig("db", "indexworkmem"))
224 if self.suppressIndex or not self.exists():
225 return
226 if self.tableDef.primary:
227 self._definePrimaryKey()
228 for index in self.tableDef.indices:
229 index.create(self)
230 self._addForeignKeys()
231 return self
232
234 return "DELETE FROM %s WHERE %s"%(
235 self.tableName, matchCondition), pars
236
238 """deletes all rows matching matchCondition.
239
240 For now, matchCondition a boolean SQL expression. All rows matching
241 it will be deleted.
242 """
243 self.connection.execute(*self.getDeleteQuery(matchCondition, pars))
244
245 - def copyIn(self, inFile, binary=True):
251
252 - def copyOut(self, outFile, binary=True):
258
260 """creates self's schema if necessary.
261 """
262 if self.tableDef.temporary:
263 return
264 schemaName = self.tableDef.rd.schema
265 if not self.schemaExists(schemaName):
266 self.connection.execute("CREATE SCHEMA %(schemaName)s"%locals())
267 self.setSchemaPrivileges(self.tableDef.rd)
268 return self
269
271 """raises a DataError if the on-disk structure of a table doesn't
272 match DaCHS' idea of it.
273
274 If the table doesn't exist on disk, this will raise a NotFoundError.
275 """
276 dbCols = self.getColumnsFromDB(self.tableDef.getQName())
277 mismatches = []
278 if len(self.tableDef.columns)<len(dbCols):
279 mismatches.append("extra columns in DB (%s)"%", ".join(
280 name for name, _ in dbCols[len(self.tableDef.columns):]))
281
282 for index, col in enumerate(self.tableDef):
283 try:
284 name, type = dbCols[index]
285 except IndexError:
286 mismatches.append("from column %s on: No matches in DB"%col.name)
287 break
288
289 if col.name.lower()!=name:
290 mismatches.append("mismatching name of %s (DB: %s)"%(col.name, name))
291 continue
292 try:
293 base.sqltypeToPgValidator(col.type)(type)
294 except TypeError as ex:
295 mismatches.append("type mismatch in column %s (%s)"%(
296 col.name, utils.safe_str(ex)))
297
298 if mismatches:
299 raise base.DataError("Table %s: %s"%(
300 self.tableDef.getQName(), "; ".join(mismatches)))
301
302
303 -class DBTable(DBMethodsMixin, table.BaseTable, MetaTableMixin):
304 """An interface to a table in the database.
305
306 These are usually created using ``api.TableForDef(tableDef)`` with a
307 table definition obtained, e.g., from an RD, saying ``onDisk=True``.
308
309 When constructing a DBTable, it will be created if necessary (unless
310 ``create=False`` is passed), but indices or primary keys keys will only be
311 created on a call to ``importFinished``.
312
313 The constructor does not check if the schema of the table on disk matches
314 the tableDef. If the two diverge, all kinds of failures are conceivable;
315 use ``dachs val -c`` to make sure on-disk structure match the RDs.
316
317 You can pass a ``nometa`` boolean kw argument to suppress entering the table
318 into the ``dc_tables`` table.
319
320 You can pass an exclusive boolean kw argument; if you do, the
321 ``iterQuery`` (and possibly similar methods in the future) method
322 will block concurrent writes to the selected rows ("FOR UPDATE")
323 as long as the transaction is active.
324
325 The main attributes (with API guarantees) include:
326
327 * tableDef -- the defining tableDef
328 * getFeeder() -- returns a function you can call with rowdicts to
329 insert them into the table.
330 * importFinished() -- must be called after you've fed all rows when
331 importing data.
332 * drop() -- drops the table in the database
333 * recreate() -- drops the table and generates a new empty one.
334 * getTableForQuery(...) -- returns a Table instance built from a query
335 over this table (you probably to use ``conn.query*`` and
336 ``td.getSimpleQuery`` instead).
337 """
338 _runScripts = None
339
340 - def __init__(self, tableDef, **kwargs):
341 self.connection = kwargs.pop("connection", None)
342 if self.connection is None:
343 raise base.ReportableError("DBTable built without connection.",
344 hint="In pre-1.0 DaCHS, database tables could automatically"
345 " open and manage connections. This turned out to be much"
346 " more trouble than it was worth. See develNotes for how"
347 " to do things today.")
348
349 self.suppressIndex = kwargs.pop("suppressIndex", False)
350 self.tableUpdates = kwargs.pop("tableUpdates", False)
351 self.exclusive = kwargs.pop("exclusive", False)
352 self.commitAfterMeta = kwargs.pop("commitAfterMeta", False)
353 table.BaseTable.__init__(self, tableDef, **kwargs)
354
355 if self.tableDef.rd is None and not self.tableDef.temporary:
356 raise base.ReportableError("TableDefs without resource descriptor"
357 " cannot be used to access database tables")
358 self.tableName = self.tableDef.getQName()
359 self.nometa = (kwargs.get("nometa", False)
360 or self.tableDef.temporary or tableDef.rd.schema=="dc")
361
362 self.newlyCreated = False
363 if kwargs.get("create", False):
364 self.createIfNecessary()
365
366 if self.tableUpdates:
367 self.addCommand = "UPDATE %s SET %s WHERE %s"%(
368 self.tableName,
369 ", ".join("%s=%%(%s)s"%(f.name, f.key)
370 for f in self.tableDef),
371 " AND ".join("%s=%%(%s)s"%(n, n) for n in self.tableDef.primary))
372 else:
373 self.addCommand = ("INSERT INTO %s (%s) VALUES (%s)"%(
374 self.tableName,
375 ", ".join([str(c.name) for c in self.tableDef.columns]),
376 ", ".join(["%%(%s)s"%c.key for c in self.tableDef.columns])))
377
378 if "rows" in kwargs:
379 self.feedRows(kwargs["rows"])
380
388
391
394
396 if "notify" not in kwargs:
397 kwargs["notify"] = not self.tableDef.system or not self.tableDef.onDisk
398 return _Feeder(self, self.addCommand, **kwargs)
399
410
414
416 """Feeds a sequence of rows to the table.
417
418 The method returns the number of rows affected. Exceptions are
419 handed through upstream, but the connection is rolled back.
420 """
421 with self.getFeeder() as feeder:
422 for r in rows:
423 feeder.add(r)
424 return feeder.nAffected
425
427 """adds a row to the table.
428
429 Use this only to add one or two rows, otherwise go for getFeeder.
430 """
431 try:
432 self.connection.execute(self.addCommand, row)
433 except sqlsupport.IntegrityError:
434 raise base.ui.logOldExc(
435 base.ValidationError("Row %s cannot be added since it clashes"
436 " with an existing record on the primary key"%row, row=row,
437 colName="unknown"))
438
440 """returns the row with the primary key key from the table.
441
442 This will raise a DataError on tables without primaries.
443 """
444 if not self.tableDef.primary:
445 raise base.DataError("Table %s has no primary key and thus does"
446 " not support getRow"%self.tableName)
447 res = list(self.iterQuery(self.tableDef,
448 " AND ".join("%s=%%(%s)s"%(n,n) for n in self.tableDef.primary),
449 pars=dict(zip(self.tableDef.primary, key))))
450 if not res:
451 raise KeyError(key)
452 return res[0]
453
455 if not self.tableDef.forceUnique:
456 return
457
458
459
460
461
462 self.connection.execute('DROP TRIGGER IF EXISTS "dropOld_%s"'
463 ' ON %s'%(self.tableName, self.tableName))
464 self.connection.execute('DROP RULE IF EXISTS updatePolicy'
465 ' ON %s'%(self.tableName))
466
467 def getMatchCondition():
468 return " AND ".join("%s=new.%s"%(n,n) for n in self.tableDef.primary)
469
470
471
472 self._definePrimaryKey()
473
474 if self.tableDef.dupePolicy=="drop":
475 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS"
476 " ON INSERT TO %s WHERE"
477 " EXISTS(SELECT * FROM %s WHERE %s)"
478 " DO INSTEAD NOTHING"%(self.tableName, self.tableName,
479 getMatchCondition()))
480
481 elif self.tableDef.dupePolicy=="check":
482
483
484
485
486 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS"
487 " ON INSERT TO %s WHERE"
488 " EXISTS(SELECT 1 FROM %s WHERE %s)"
489 " DO INSTEAD NOTHING"%(self.tableName, self.tableName,
490 " AND ".join("(new.%s IS NULL OR %s IS NULL OR %s=new.%s)"%(
491 c.name, c.name, c.name,c.name) for c in self.tableDef)))
492
493 elif self.tableDef.dupePolicy=="dropOld":
494 args = {
495 "table": self.tableName,
496 "matchCond": getMatchCondition()}
497 self.connection.execute('CREATE OR REPLACE FUNCTION "dropOld_%(table)s"()'
498 ' RETURNS trigger AS $body$\n'
499 " BEGIN\n"
500 " IF (EXISTS(SELECT 1 FROM %(table)s WHERE %(matchCond)s)) THEN\n"
501 " DELETE FROM %(table)s WHERE %(matchCond)s;\n"
502 " END IF;\n"
503 " RETURN NEW;\nEND\n$body$ LANGUAGE plpgsql"%args)
504 self.connection.execute(
505 'CREATE TRIGGER "dropOld_%(table)s" BEFORE INSERT OR UPDATE'
506 ' ON %(table)s FOR EACH ROW EXECUTE PROCEDURE "dropOld_%(table)s"()'%
507 args)
508
509 elif self.tableDef.dupePolicy=="overwrite":
510 self.connection.execute("CREATE OR REPLACE RULE updatePolicy AS"
511 " ON INSERT TO %s WHERE"
512 " EXISTS(SELECT %s FROM %s WHERE %s)"
513 " DO INSTEAD UPDATE %s SET %s WHERE %s"%(self.tableName,
514 ",".join(self.tableDef.primary),
515 self.tableName, getMatchCondition(),
516 self.tableName,
517 ", ".join("%s=new.%s"%(c.name,c.name) for c in self.tableDef),
518 getMatchCondition()))
519 else:
520 raise base.DataError("Invalid dupePolicy: %s"%self.tableDef.dupePolicy)
521
528
533
541
564
569
579
584
585 - def query(self, query, data={}):
586 """runs query within this table's connection.
587
588 query is macro-expanded within the table definition (i.e., you can,
589 e.g., write \qName to obtain the table's qualified name).
590 """
591 if "\\" in query:
592 query = self.tableDef.expand(query)
593 return DBMethodsMixin.query(self, query, data)
594
596 """returns the select clause to come up with resultTableDef.
597 """
598 parts = []
599 for of in resultTableDef:
600 select = getattr(of, "select", None)
601 if select:
602 parts.append("%s AS %s"%(select, of.name))
603 else:
604 parts.append(of.name)
605 return ", ".join(parts)
606
607 - def getQuery(self, resultTableDef, fragment, pars=None,
608 distinct=False, limits=None, groupBy=None):
609 """returns a result table definition, query string and a parameters
610 dictionary for a query against this table.
611
612 See getTableForQuery for the meaning of the arguments.
613 """
614 if pars is None:
615 pars = {}
616
617 if not isinstance(resultTableDef, rscdef.TableDef):
618 resultTableDef = base.makeStruct(rscdef.TableDef,
619 id="iterQuery", columns=resultTableDef)
620
621 query = ["SELECT "]
622 if distinct:
623 query.append("DISTINCT ")
624 query.append(self.getSelectClause(resultTableDef)+" ")
625 query.append("FROM %s "%self.tableName)
626
627 if fragment and fragment.strip():
628 query.append("WHERE %s "%fragment)
629 if groupBy:
630 query.append("GROUP BY %s "%groupBy)
631 if limits:
632 query.append(limits[0]+" ")
633 pars.update(limits[1])
634 if self.exclusive:
635 query.append("FOR UPDATE ")
636 return resultTableDef, "".join(query), pars
637
638 - def iterQuery(self, resultTableDef=None, fragment="", pars=None,
639 distinct=False, limits=None, groupBy=None):
640 """like getTableForQuery, except that an iterator over the
641 result rows is returned.
642
643 (there is no advantage in using this as we will pull the entire
644 thing in memory anyway; use qtables if you need streaming).
645 """
646 for row in self.getTableForQuery(resultTableDef, fragment,
647 pars, distinct, limits, groupBy).rows:
648 yield row
649
650 - def getTableForQuery(self, resultTableDef=None, fragment="", pars=None,
651 distinct=False, limits=None, groupBy=None):
652 """returns a Table instance for a query on this table.
653
654 resultTableDef is a TableDef with svc.OutputField columns
655 (rscdef.Column instances will do), or possibly just a list
656 of Columns. Fragment is empty or an SQL
657 where-clause with
658 dictionary placeholders, pars is the dictionary filling
659 fragment, distinct, if True, adds a distinct clause,
660 and limits, if given, is a pair of an SQL string to be
661 appended to the SELECT clause and parameters filling it.
662 queryMeta.asSQL returns what you need here.
663
664 pars may be mutated in the process.
665 """
666 if resultTableDef is None:
667 resultTableDef = self.tableDef.copy(None)
668 resultTableDef, query, pars = self.getQuery(
669 resultTableDef,
670 fragment,
671 pars=pars,
672 distinct=distinct,
673 limits=limits,
674 groupBy=groupBy)
675 return table.InMemoryTable(resultTableDef,
676 rows=[resultTableDef.makeRowFromTuple(tupRow)
677 for tupRow in self.query(query, pars)])
678
679
680 -class View(DBTable):
681 """is a view, i.e., a table in the database you can't add to.
682
683 Strictly, I should derive both View and DBTable from a common
684 base, but that's currently not worth the effort.
685
686 Technically, Views are DBTables with a non-None viewStatement
687 (this is what TableForDef checks for when deciding whether to
688 construct a DBTable or a View). You can get a feeder for them,
689 but trying to actually feed anything will raise a DataError.
690
691 On import, views only run postCreation scripts;
692 since there are no indices, no preIndex scripts are not run, since
693 no import takes place, there's no preImport or newSource.
694 """
695
699
702
703 feedRows = addRow
704
710
712
713
714 return _RaisingFeeder(self, None)
715
728
731
733
734 self.runScripts("postCreation")
735 return self
736