Source code for gavo.grammars.odbcgrammar

"""
A Grammar feeding from an odbc connection.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from psycopg2.extensions import adapt

from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.grammars.common import Grammar, RowIterator

pyodbc = utils.DeferredImport("pyodbc", "import pyodbc")


[docs]class QueryGenerator(rscdef.ProcApp): """A generator of ODBC queries. This is is mainly useful when doing ODBC queries to incrementally havest some external resource. The current ODBC iterator will be available as ``self``. The procedures also see a function escapeSQL(val) that returns val as a SQL literal (actually, it's psycopg2's adapt at the moment). This is intended to be used somewhat like this with a monotonously increasing column insertion_time:: <makeQuery> <code> # find to until when we have data locally try: with base.getTableConn() as conn: localMax = next(conn.query( "SELECT MAX(insertion_time) FROM \schema.main"))[0] fragment = " WHERE insertion_time>{}".format( sqlEscape(localMax)) except base.DBError as msg: base.ui.notifyWarning(f"{msg} while harvesting: full re-harvest") fragment = "" return f"SELECT * FROM remote_table{fragment}" </code> </makeQuery> """ name_ = "makeQuery" requiredType = "makeQuery" formalArgs = "self" additionalNamesForProcs = { "escapeSQL": adapt, }
[docs]class ODBCIterator(RowIterator): def _iterRows(self): with open(self.sourceToken) as f: accessToken = f.read().strip() conn = pyodbc.connect(accessToken) if self.grammar.query: remoteQuery = self.grammar.query else: remoteQuery = self.grammar.makeQuery.compile()(self) cursor = conn.cursor() cursor.execute(remoteQuery) keys = [d[0] for d in cursor.description] for row in cursor: yield dict(list(zip(keys, row)))
[docs]class ODBCGrammar(Grammar): """A grammar that feeds from a remote database. This works as a sort of poor man's foreign data wrapper: you pull data from a remote database now and then, mogrifying it into whatever format you want locally. This expects files containing pyodbc connection strings as sources, so you'll normally just have one source. Having the credentials externally helps keeping RDs using this safe for public version control. An example for an ODBC connection string:: DRIVER={SQL Server};SERVER=localhost;DATABASE=testdb;UID=me;PWD=pass See also http://www.connectionstrings.com/ This will only work if pyodbc (debian: python3-pyodbc) is installed. Additionally, you will have to install the odbc driver corresponding to your source database (e.g., odbc-postgresql). """ name_ = "odbcGrammar" _query = base.UnicodeAttribute("query", description="The query to run on the remote server. The keys of" " the grammar will be the names of the result columns.", default=base.NotGiven, copyable=True) _makeQuery = base.StructAttribute("makeQuery", childFactory=QueryGenerator, default=base.NotGiven, description="Code returning the query to execute on the remote" " system.") rowIterator = ODBCIterator
[docs] def validate(self): super().validate() if self.query and self.makeQuery: raise base.StructureError("Cannot give both query and makeQuery" " in an odbcGrammar.") if not (self.query or self.makeQuery): raise base.StructureError("Need to give at least one of query" " and makeQuery in an odbcGrammar.")
[docs] def completeElement(self, ctx): if ctx.restricted and self.makeQuery: raise base.RestrictedElement("makeQuery") super().completeElement(ctx)