Source code for gavo.rsc.qtable
A table representing a query.
This is mainly for streaming applications. The table represents
a DB query result. All you can do with the data itself is iterate over
the rows. The metadata is usable as with any other table.
#c Copyright 2008-2022, the GAVO project <firstname.lastname@example.org>
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
from gavo import base
from gavo import rscdef
from gavo.rsc import dbtable
from gavo.rsc import table
[docs]class QueryTable(table.BaseTable, dbtable.DBMethodsMixin):
"""QueryTables are constructed with a table definition and a DB query
feeding this table definition.
A QueryTable must be constructed with a transactional connection (in
sqlsupport terms: Writable). If you pass autoClose=True, it will close this
connection after the data is delivered.
This funky semantics is for the benefit of taprunner; it needs a
connection up front for uploads.
There's an alternative constructor allowing "quick" construction of
the result table (fromColumns).
connection = None
def __init__(self, tableDef, query, connection, **kwargs):
self.connection = connection
self.autoClose = kwargs.pop("autoClose", False)
if "rows" in kwargs:
raise base.ReportableError("QueryTables cannot be constructed"
" with rows.")
self.query = query
table.BaseTable.__init__(self, tableDef, connection=connection,
self._parametersToRestore = 
def fromColumns(cls, colSpec, query, connection, **kwargs):
"""returns a QueryTable object for query, where the result table is
inferred from colSpec.
colSpec is a sequence consisting of either dictionaries with constructor
arguments to rscdef.Column or complete objects suitable as rscdef.Column
objects; further kwargs are passed on the the QueryTable's constructor.
columns = 
for c in colSpec:
if isinstance(c, dict):
return cls(base.makeStruct(rscdef.TableDef, columns=columns),
query, connection=connection, **kwargs)
"""actually runs the query and returns rows (dictionaries).
You can only iterate once. At exhaustion, the connection will
if self.connection is None:
raise base.ReportableError("QueryTable already exhausted.")
nRows = 0
# We want to enable parallel execution for queries where it's
# worth it, but we'd like to stream your giant select *-type
# queries. First gung-ho criterion: If there's a GROUP (sc. BY)
# in query,
# second gung-ho criterion: obscore queries.
loQ = self.query.lower()
if "group" in loQ or "ivoa.obscore" in loQ:
cursor = self.connection.cursor()
cursor = self.connection.cursor("cursor"+hex(id(self)))
nextRows = cursor.fetchmany(100000)
if not nextRows:
for row in nextRows:
nRows += 1
# overflowLimit is usually set by the TAP machinery; we don't
# want to depend on it, though.
if getattr(self.tableDef, "overflowLimit", None)==nRows:
[docs] def configureOnClose(self, parPairs):
"""adds parameter pairs (as coming back from conn.configure) to
a set of pairs to be restored when this qtable is being cleaned
For a connection in error, use of this facility will mean
that connections will be rolled back automatically (because
we couldn't restore the parameters otherwise).
if self._parametersToRestore and self.connection is not None:
# the connection might be in an error state. Roll back and
# try again. If that fails again, things are really bad
# and nobody will be interested in the fact that parameter
# restoration has failed, so fail silently.
except base.DBError: # see above
[docs] def cleanup(self):
if getattr(self, "connection", None) is not None:
# Connection already closed or similarly ignorable
self.connection = None
[docs] def getPlan(self):
"""returns a parsed query plan for the current query.
After you use this method, the iterator is exhausted and the
connection will be closed.
cursor = self.connection.cursor()
res = "\n".join(s for s in cursor)