1 """
2 A table representing a query.
3
4 This is mainly for streaming applications. The table represents
5 a DB query result. All you can do with the data itself is iterate over
6 the rows. The metadata is usable as with any other table.
7 """
8
9
10
11
12
13
14
15 from gavo import base
16 from gavo import rscdef
17 from gavo.rsc import dbtable
18 from gavo.rsc import table
19 from gavo.utils import pgexplain
20
21
22 -class QueryTable(table.BaseTable, dbtable.DBMethodsMixin):
23 """QueryTables are constructed with a table definition and a DB query
24 feeding this table definition.
25
26 A QueryTable must be constructed with a connection. If you pass
27 autoClose=True, it will close this connection after the data is
28 delivered.
29
30 This funky semantics is for the benefit of taprunner; it needs a
31 connection up front for uploads.
32
33 There's an alternative constructor allowing "quick" construction of
34 the result table (fromColumns).
35 """
36 connection = None
37
38 - def __init__(self, tableDef, query, connection, **kwargs):
47
48 @classmethod
49 - def fromColumns(cls, colSpec, query, connection, **kwargs):
50 """returns a QueryTable object for query, where the result table is
51 inferred from colSpec.
52
53 colSpec is a sequence consisting of either dictionaries with constructor
54 arguments to rscdef.Column or complete objects suitable as rscdef.Column
55 objects; futher kwargs are passed on the the QueryTable's constructor.
56 """
57 columns = []
58 for c in colSpec:
59 if isinstance(c, dict):
60 columns.append(base.makeStruct(rscdef.Column, **c))
61 else:
62 columns.append(c)
63 return cls(base.makeStruct(rscdef.TableDef, columns=columns),
64 query, connection=connection, **kwargs)
65
67 """actually runs the query and returns rows (dictionaries).
68
69 You can only iterate once. At exhaustion, the connection will
70 be closed.
71 """
72 if self.connection is None:
73 raise base.ReportableError("QueryTable already exhausted.")
74
75 nRows = 0
76 cursor = self.connection.cursor("cursor"+hex(id(self)))
77 cursor.execute(self.query)
78 while True:
79 nextRows = cursor.fetchmany(1000)
80 if not nextRows:
81 break
82 for row in nextRows:
83 nRows += 1
84 yield self.tableDef.makeRowFromTuple(row)
85 cursor.close()
86
87
88
89 if getattr(self.tableDef, "overflowLimit", None)==nRows:
90 self.setMeta("_queryStatus", "OVERFLOW")
91 self.cleanup()
92
94
95 raise AttributeError()
96
105
117
120