Source code for gavo.rsc.qtable

"""
A table representing a query.

This is mainly for streaming applications.  The table represents
a DB query result.  All you can do with the data itself is iterate over
the rows.  The metadata is usable as with any other table.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import base
from gavo import rscdef
from gavo.rsc import dbtable
from gavo.rsc import table


[docs]class QueryTable(table.BaseTable, dbtable.DBMethodsMixin): """QueryTables are constructed with a table definition and a DB query feeding this table definition. A QueryTable must be constructed with a transactional connection (in sqlsupport terms: Writable). If you pass autoClose=True, it will close this connection after the data is delivered. This funky semantics is for the benefit of taprunner; it needs a connection up front for uploads. There's an alternative constructor allowing "quick" construction of the result table (fromColumns). """ connection = None def __init__(self, tableDef, query, connection, **kwargs): self.connection = connection self.autoClose = kwargs.pop("autoClose", False) if "rows" in kwargs: raise base.ReportableError("QueryTables cannot be constructed" " with rows.") self.query = query table.BaseTable.__init__(self, tableDef, connection=connection, **kwargs) self._parametersToRestore = []
[docs] @classmethod def fromColumns(cls, colSpec, query, connection, **kwargs): """returns a QueryTable object for query, where the result table is inferred from colSpec. colSpec is a sequence consisting of either dictionaries with constructor arguments to rscdef.Column or complete objects suitable as rscdef.Column objects; further kwargs are passed on the the QueryTable's constructor. """ columns = [] for c in colSpec: if isinstance(c, dict): columns.append(base.makeStruct(rscdef.Column, **c)) else: columns.append(c) return cls(base.makeStruct(rscdef.TableDef, columns=columns), query, connection=connection, **kwargs)
def __iter__(self): """actually runs the query and returns rows (dictionaries). You can only iterate once. At exhaustion, the connection will be closed. """ if self.connection is None: raise base.ReportableError("QueryTable already exhausted.") nRows = 0 # We want to enable parallel execution for queries where it's # worth it, but we'd like to stream your giant select *-type # queries. First gung-ho criterion: If there's a GROUP (sc. BY) # in query, # second gung-ho criterion: obscore queries. loQ = self.query.lower() if "group" in loQ or "ivoa.obscore" in loQ: cursor = self.connection.cursor() else: cursor = self.connection.cursor("cursor"+hex(id(self))) cursor.execute(self.query) try: while True: nextRows = cursor.fetchmany(100000) if not nextRows: break for row in nextRows: nRows += 1 yield self.tableDef.makeRowFromTuple(row) cursor.close() # overflowLimit is usually set by the TAP machinery; we don't # want to depend on it, though. if getattr(self.tableDef, "overflowLimit", None)==nRows: self.setMeta("_queryStatus", "OVERFLOW") finally: self.cleanup() def __len__(self): return None
[docs] def configureOnClose(self, parPairs): """adds parameter pairs (as coming back from conn.configure) to a set of pairs to be restored when this qtable is being cleaned up. For a connection in error, use of this facility will mean that connections will be rolled back automatically (because we couldn't restore the parameters otherwise). """ self._parametersToRestore.extend(parPairs)
def _restoreParameters(self): """helps configureOnClose. """ if self._parametersToRestore and self.connection is not None: try: self.connection.configure(self._parametersToRestore) except base.DBError: # the connection might be in an error state. Roll back and # try again. If that fails again, things are really bad # and nobody will be interested in the fact that parameter # restoration has failed, so fail silently. try: self.connection.rollback() self.connection.configure(self._parametersToRestore) except base.DBError: # see above pass
[docs] def cleanup(self): if getattr(self, "connection", None) is not None: if self.autoClose: try: self.connection.close() except base.DBError: # Connection already closed or similarly ignorable pass else: self._restoreParameters() self.connection = None
[docs] def getPlan(self): """returns a parsed query plan for the current query. After you use this method, the iterator is exhausted and the connection will be closed. """ cursor = self.connection.cursor() cursor.execute("EXPLAIN "+self.query) res = "\n".join(s[0] for s in cursor) self.cleanup() return res
def __del__(self): self.cleanup()