Package gavo :: Package base :: Module sqlsupport
[frames] | no frames]

Source Code for Module gavo.base.sqlsupport

   1  # -*- encoding: iso-8859-1 -*- 
   2  """ 
   3  Basic support for communicating with the database server. 
   4   
   5  This is currently very postgres specific.  If we really wanted to 
   6  support some other database, this would need massive refactoring. 
   7  """ 
   8   
   9  #c Copyright 2008-2019, the GAVO project 
  10  #c 
  11  #c This program is free software, covered by the GNU GPL.  See the 
  12  #c COPYING file in the source distribution. 
  13   
  14   
  15  from __future__ import print_function 
  16   
  17  import imp 
  18   
  19  import contextlib 
  20  import os 
  21  import random 
  22  import re 
  23  import threading 
  24  import warnings 
  25  import weakref 
  26   
  27  import numpy 
  28   
  29  from gavo import utils 
  30  from gavo.base import config 
  31   
  32  debug = "GAVO_SQL_DEBUG" in os.environ 
  33   
  34  import psycopg2 
  35  import psycopg2.extensions 
  36  import psycopg2.pool 
  37  psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) 
  38  try: 
  39          psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) 
  40  except AttributeError:  # UNICODEARRAY only at psycopg2 >2.0 
  41          pass 
  42   
  43  from psycopg2.extras import DictCursor #noflake: exported name 
44 45 -class Error(utils.Error):
46 pass
47 48 49 NUMERIC_TYPES = frozenset(["smallint", "integer", "bigint", "real", 50 "double precision"]) 51 52 ORDERED_TYPES = frozenset(["timestamp", "text", "unicode"]) | NUMERIC_TYPES 53 54 55 _PG_TIME_UNITS = { 56 "ms": 0.0001, 57 "s": 1., 58 "": 1., 59 "min": 60., 60 "h": 3600., 61 "d": 86400.,} 62 63 64 # Keep track of wether we have installed our extensions 65 # (this is to not require a DB connection on just importing this) 66 _PSYCOPG_INITED = False
67 68 -class SqlSetAdapter(object):
69 """is an adapter that formats python sequences as SQL sets. 70 71 -- as opposed to psycopg2's apparent default of building arrays 72 out of them. 73 """
74 - def __init__(self, seq):
75 self._seq = seq
76
77 - def prepare(self, conn):
78 pass
79
80 - def getquoted(self):
81 qobjs = [] 82 for o in self._seq: 83 if isinstance(o, unicode): 84 qobjs.append(psycopg2.extensions.adapt(str(o)).getquoted()) 85 else: 86 qobjs.append(psycopg2.extensions.adapt(o).getquoted()) 87 return '(%s)'%(", ".join(qobjs))
88 89 __str__ = getquoted
90
91 92 -class SqlArrayAdapter(object):
93 """An adapter that formats python lists as SQL arrays 94 95 This makes, in the shameful tradition of VOTable, empty arrays equal to 96 NULL. 97 """
98 - def __init__(self, seq):
99 self._seq = seq
100
101 - def prepare(self, conn):
102 pass
103
104 - def _addCastIfNecessary(self, serializedList):
105 """adds a typecast to serializedList if it needs one. 106 107 This is when all entries in serializedList are NULL; so, we're fine 108 anyway if the first element is non-NULL; if it's not, we try to 109 guess. 110 111 serializedList is changed in place, the method returns nothing. 112 """ 113 if not serializedList or serializedList[0]!="NULL": 114 return 115 116 if isinstance(self._seq, utils.floatlist): 117 serializedList[0] = "NULL::REAL" 118 elif isinstance(self._seq, utils.intlist): 119 serializedList[0] = "NULL::INTEGER"
120 121
122 - def getquoted(self):
123 if len(self._seq)==0: 124 return 'NULL' 125 qobjs = [str(psycopg2.extensions.adapt(o).getquoted()) 126 for o in self._seq] 127 128 self._addCastIfNecessary(qobjs) 129 130 return 'ARRAY[ %s ]'%(", ".join(qobjs))
131 132 __str__ = getquoted
133
134 135 -class FloatableAdapter(object):
136 """An adapter for things that do "float", in particular numpy.float* 137 """
138 - def __init__(self, val):
139 self.val = float(val)
140
141 - def prepare(self, conn):
142 pass
143
144 - def getquoted(self):
145 if self.val!=self.val: 146 return "'nan'::real" 147 else: 148 return repr(self.val)
149 150 __str__ = getquoted
151
152 153 -class IntableAdapter(object):
154 """An adapter for things that do "int", in particular numpy.int* 155 """
156 - def __init__(self, val):
157 self.val = int(val)
158
159 - def prepare(self, conn):
160 pass
161
162 - def getquoted(self):
163 return str(self.val)
164 165 __str__ = getquoted
166
167 168 -class NULLAdapter(object):
169 """An adapter for things that should end up as NULL in the DB. 170 """
171 - def __init__(self, val):
172 # val doesn't matter, we're making it NULL anyway 173 pass
174
175 - def prepare(self, conn):
176 pass
177
178 - def getquoted(self):
179 return "NULL"
180 181 __str__ = getquoted
182 183 184 psycopg2.extensions.register_adapter(list, SqlArrayAdapter) 185 psycopg2.extensions.register_adapter(numpy.ndarray, SqlArrayAdapter) 186 psycopg2.extensions.register_adapter(tuple, SqlSetAdapter) 187 psycopg2.extensions.register_adapter(set, SqlSetAdapter) 188 psycopg2.extensions.register_adapter(frozenset, SqlSetAdapter) 189 190 for numpyType, adapter in [ 191 ("float32", FloatableAdapter), 192 ("float64", FloatableAdapter), 193 ("float96", FloatableAdapter), 194 ("int8", IntableAdapter), 195 ("int16", IntableAdapter), 196 ("int32", IntableAdapter), 197 ("int64", IntableAdapter),]: 198 try: 199 psycopg2.extensions.register_adapter( 200 getattr(numpy, numpyType), adapter) 201 except AttributeError: 202 # what's not there we don't need to adapt 203 pass 204 205 206 try: 207 from gavo.utils import pyfits 208 psycopg2.extensions.register_adapter(pyfits.Undefined, NULLAdapter) 209 except (ImportError, NameError): 210 # don't fail here if pyfits isn't installed or is too old 211 pass 212 213 from psycopg2 import (OperationalError, #noflake: exported names 214 DatabaseError, IntegrityError, ProgrammingError, 215 InterfaceError, DataError, InternalError) 216 from psycopg2.extensions import QueryCanceledError #noflake: exported name 217 from psycopg2 import Error as DBError
218 219 220 -def registerAdapter(type, adapter):
221 psycopg2.extensions.register_adapter(type, adapter)
222
223 224 -def registerType(oid, name, castFunc):
225 newOID = psycopg2.extensions.new_type(oid, name, castFunc) 226 psycopg2.extensions.register_type(newOID)
227
228 229 -class DebugCursor(psycopg2.extensions.cursor):
230 - def execute(self, sql, args=None):
231 print("Executing %s %s"%(id(self.connection), 232 sql.encode("ascii", "ignore"))) 233 psycopg2.extensions.cursor.execute(self, sql, args) 234 print("Finished %s %s"%(id(self.connection), 235 self.query.decode("ascii", "ignore").encode("ascii", "ignore"))) 236 return self.rowcount
237
238 - def executemany(self, sql, args=[]):
239 print("Executing many", sql.encode("ascii", "ignore")) 240 print(("%d args, first one:\n%s"%(len(args), args[0]) 241 ).encode("ascii", "ignore")) 242 res = psycopg2.extensions.cursor.executemany(self, sql, args) 243 print("Finished many", self.query.encode("ascii", "ignore")) 244 return res
245
246 247 -class GAVOConnection(psycopg2.extensions.connection):
248 """A psycopg2 connection with some additional methods. 249 250 This derivation is also done so we can attach the getDBConnection 251 arguments to the connection; it is used when recovering from 252 a database restart. 253 """
254 - def setTimeout(self, timeout):
255 """sets a timeout on queries. 256 257 timeout is in seconds; timeout=0 disables timeouts (this is what 258 postgres does, too) 259 """ 260 # don't use query here since query may call setTimeout 261 cursor = self.cursor() 262 try: 263 if timeout==-12: # Special instrumentation for testing 264 cursor.execute("SET statement_timeout TO 1") 265 elif timeout is not None: 266 cursor.execute( 267 "SET statement_timeout TO %d"%(int(float(timeout)*1000))) 268 finally: 269 cursor.close()
270
271 - def getTimeout(self):
272 """returns the current timeout setting. 273 274 The value is in float seconds. 275 """ 276 cursor = self.cursor() 277 try: 278 cursor.execute("SHOW statement_timeout") 279 rawVal = list(cursor)[0][0] 280 mat = re.match("(\d+)(\w*)$", rawVal) 281 try: 282 return int(mat.group(1))*_PG_TIME_UNITS[mat.group(2)] 283 except (ValueError, AttributeError, KeyError): 284 raise ValueError("Bad timeout value from postgres: %s"%rawVal) 285 finally: 286 cursor.close()
287 288 @contextlib.contextmanager
289 - def timeoutSet(self, timeout, cursor):
290 """a contextmanager to have a timeout set in the controlled 291 section. 292 """ 293 if timeout is not None: 294 oldTimeout = self.getTimeout() 295 self.setTimeout(timeout) 296 297 yield 298 299 # don't try to restore the timeout on an exception; presumably 300 # things are hosed enough that we'll discard the connection, 301 # and we can't talk to the DB anyway until a rollback. 302 if timeout is not None: 303 self.setTimeout(oldTimeout)
304
305 - def queryToDicts(self, query, args={}, timeout=None, caseFixer=None):
306 """iterates over dictionary rows for query. 307 308 This is mainly for ad-hoc queries needing little metadata. 309 310 The dictionary keys are determined by what the database says the 311 column titles are; thus, it's usually lower-cased variants of what's 312 in the select-list. To fix this, you can pass in a caseFixer dict 313 that gives a properly cased version of lowercase names. 314 """ 315 cursor = self.cursor() 316 try: 317 with self.timeoutSet(timeout, cursor): 318 cursor.execute(query, args) 319 keys = [cd[0] for cd in cursor.description] 320 if caseFixer: 321 keys = [caseFixer.get(key, key) for key in keys] 322 for row in cursor: 323 yield dict(zip(keys, row)) 324 finally: 325 cursor.close()
326
327 - def query(self, query, args={}, timeout=None):
328 """iterates over result tuples for query. 329 """ 330 cursor = self.cursor() 331 try: 332 with self.timeoutSet(timeout, cursor): 333 cursor.execute(query, args) 334 for row in cursor: 335 yield row 336 finally: 337 cursor.close()
338
339 - def execute(self, query, args={}):
340 """executes query in a cursor. 341 342 This returns the rowcount of the cursor used. 343 """ 344 cursor = self.cursor() 345 try: 346 cursor.execute(query, args) 347 return cursor.rowcount 348 finally: 349 cursor.close()
350 351 @contextlib.contextmanager
352 - def savepoint(self):
353 """sets up a section protected by a savepoint that will be released 354 after use. 355 356 If an exception happens in the controlled section, the connection 357 will be rolled back to the savepoint. 358 """ 359 savepointName = "auto_%s"%(random.randint(0, 2147483647)) 360 self.execute("SAVEPOINT %s"%savepointName) 361 try: 362 yield 363 except: 364 self.execute("ROLLBACK TO SAVEPOINT %s"%savepointName) 365 raise 366 finally: 367 self.execute("RELEASE SAVEPOINT %s"%savepointName)
368
369 370 -def savepointOn(conn):
371 raise NotImplementedError("Don't use the savepointOn function any more;" 372 " use connection.savepoint.")
373
374 375 -class DebugConnection(GAVOConnection):
376 - def cursor(self, *args, **kwargs):
377 kwargs["cursor_factory"] = DebugCursor 378 return psycopg2.extensions.connection.cursor(self, *args, **kwargs)
379
380 - def commit(self):
381 print("Commit %s"%id(self)) 382 return GAVOConnection.commit(self)
383
384 - def rollback(self):
385 print("Rollback %s"%id(self)) 386 return GAVOConnection.rollback(self)
387
388 - def getPID(self):
389 cursor = self.cursor() 390 cursor.execute("SELECT pg_backend_pid()") 391 pid = list(cursor)[0][0] 392 cursor.close() 393 return pid
394
395 396 -class NullConnection(object):
397 """A standin to pass whereever a function wants a connection but 398 doesn't actually need one in a particular situation. 399 400 This, in particular, concerns makeData. 401 402 To be accomodating to careful code, we'll allow commit and rollback 403 on these, but we'll raise on anything else. 404 """
405 - def __getattr__(self, name):
406 raise ValueError("Attempt to use NullConnection (attribute %s)"%name)
407
408 - def commit(self):
409 pass
410
411 - def rollback(self):
412 pass
413
414 415 -def getDBConnection(profile, debug=debug, autocommitted=False):
416 """returns an enhanced database connection through profile. 417 418 You will typically rather use the context managers for the standard 419 profiles (``getTableConnection`` and friends). Use this function if 420 you want to keep your connection out of connection pools or if you want 421 to use non-standard profiles. 422 423 profile will usually be a string naming a profile defined in 424 ``GAVO_ROOT/etc``. 425 """ 426 if profile is None: 427 profile = "trustedquery" 428 if isinstance(profile, basestring): 429 profile = config.getDBProfile(profile) 430 431 if debug: 432 conn = psycopg2.connect(connection_factory=DebugConnection, 433 **profile.getArgs()) 434 print("NEW CONN using %s (%s)"%(profile.name, conn.getPID()), id(conn)) 435 def closer(): 436 print("CONNECTION CLOSE", id(conn)) 437 return DebugConnection.close(conn)
438 conn.close = closer 439 else: 440 try: 441 conn = psycopg2.connect(connection_factory=GAVOConnection, 442 **profile.getArgs()) 443 except OperationalError as msg: 444 raise utils.ReportableError("Cannot connect to the database server." 445 " The database library reported:\n\n%s"%msg, 446 hint="This usually means you must adapt either the access profiles" 447 " in $GAVO_DIR/etc or your database config (in particular," 448 " pg_hba.conf).") 449 450 if not _PSYCOPG_INITED: 451 _initPsycopg() 452 453 if autocommitted: 454 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 455 conn.set_client_encoding("UTF8") 456 457 conn._getDBConnectionArgs = { 458 "profile": profile, 459 "debug": debug, 460 "autocommitted": autocommitted} 461 return conn 462
463 464 -def _parseTableName(tableName, schema=None):
465 """returns schema, unqualified table name for the arguments. 466 467 schema=None selects the default schema (public for postgresql). 468 469 If tableName is qualified (i.e. schema.table), the schema given 470 in the name overrides the schema argument. 471 472 We do not support delimited identifiers for tables in DaCHS. Hence, 473 this will raise a ValueError if anything that wouldn't work as 474 an SQL regular identifier (except we don't filter for reserved 475 words yet, which is an implementation detail that might change). 476 """ 477 parts = tableName.split(".") 478 if len(parts)>2: 479 raise ValueError("%s is not a SQL regular identifier"%repr(tableName)) 480 for p in parts: 481 if not utils.identifierPattern.match(p): 482 raise ValueError("%s is not a SQL regular identifier"%repr(tableName)) 483 484 if len(parts)==1: 485 name = parts[0] 486 else: 487 schema, name = parts 488 489 if schema is None: 490 schema = "public" 491 492 return schema.lower(), name.lower()
493
494 495 -def parseBannerString(bannerString, digits=2):
496 """returns digits from a postgres server banner. 497 498 This hardcodes the response given by postgres 8 and raises a ValueError 499 if the expected format is not found. 500 """ 501 mat = re.match(r"PostgreSQL ([\d.]*)", bannerString) 502 if not mat: 503 raise ValueError("Cannot make out the Postgres server version from %s"% 504 repr(bannerString)) 505 return ".".join(mat.group(1).split(".")[:digits])
506
507 508 -class PostgresQueryMixin(object):
509 """is a mixin containing various useful queries that are postgres specific. 510 511 This mixin expects a parent that mixes is QuerierMixin (that, for now, 512 also mixes in PostgresQueryMixin, so you won't need to mix this in). 513 """
514 - def getServerVersion(self, digits=2):
515 """returns the version of the connection's server to digits numbers. 516 """ 517 bannerString = list(self.query("SELECT version()"))[0][0] 518 return parseBannerString(bannerString, digits)
519
520 - def getPrimaryIndexName(self, tableName):
521 """returns the name of the index corresponding to the primary key on 522 (the unqualified) tableName. 523 """ 524 return ("%s_pkey"%tableName).lower()
525
526 - def schemaExists(self, schema):
527 """returns True if the named schema exists in the database. 528 """ 529 matches = list(self.query("SELECT nspname FROM" 530 " pg_namespace WHERE nspname=%(schemaName)s", { 531 'schemaName': schema, 532 })) 533 return len(matches)!=0
534
535 - def hasIndex(self, tableName, indexName, schema=None):
536 """returns True if table tablename has and index called indexName. 537 538 See _parseTableName on the meaning of the arguments. 539 """ 540 schema, tableName = _parseTableName(tableName, schema) 541 res = list(self.query("SELECT indexname FROM" 542 " pg_indexes WHERE schemaname=lower(%(schema)s) AND" 543 " tablename=lower(%(tableName)s) AND" 544 " indexname=lower(%(indexName)s)", locals())) 545 return len(list(res))>0
546
547 - def _getColIndices(self, relOID, colNames):
548 """returns a sorted tuple of column indices of colNames in the relation 549 relOID. 550 551 This really is a helper for foreignKeyExists. 552 """ 553 colNames = set(n.lower() for n in colNames) 554 res = [r[0] for r in 555 self.query("SELECT attnum FROM pg_attribute WHERE" 556 " attrelid=%(relOID)s and attname IN %(colNames)s", 557 locals())] 558 res.sort() 559 return res
560
561 - def getForeignKeyName(self, srcTableName, destTableName, srcColNames, 562 destColNames, schema=None):
563 """returns True if there's a foreign key constraint on srcTable's 564 srcColNames using destTableName's destColNames. 565 566 Warning: names in XColNames that are not column names in the respective 567 tables are ignored. 568 569 This raises a ValueError if the foreign keys do not exist. 570 """ 571 try: 572 srcOID = self.getOIDForTable(srcTableName, schema) 573 srcColInds = self._getColIndices( #noflake: used in locals() 574 srcOID, srcColNames) 575 destOID = self.getOIDForTable(destTableName, schema) 576 destColInds = self._getColIndices( #noflake: used in locals() 577 destOID, destColNames) 578 except Error: # Some of the items related probably don't exist 579 return False 580 581 res = list(self.query("""SELECT conname FROM pg_constraint WHERE 582 contype='f' 583 AND conrelid=%(srcOID)s 584 AND confrelid=%(destOID)s 585 AND conkey=%(srcColInds)s::SMALLINT[] 586 AND confkey=%(destColInds)s::SMALLINT[]""", locals())) 587 588 if len(res)==1: 589 return res[0][0] 590 else: 591 raise ValueError("Non-existing or ambiguos foreign key")
592
593 - def foreignKeyExists(self, srcTableName, destTableName, srcColNames, 594 destColNames, schema=None):
595 try: 596 _ = self.getForeignKeyName( #noflake: ignored value 597 srcTableName, destTableName, 598 srcColNames, destColNames, 599 schema) 600 return True 601 except ValueError: 602 return False
603 604 @utils.memoized
605 - def _resolveTypeCode(self, oid):
606 """returns a textual description for a type oid as returned 607 by cursor.description. 608 609 These descriptions are *not* DDL-ready. There's the 610 611 *** postgres specific *** 612 """ 613 res = list(self.query( 614 "select typname from pg_type where oid=%(oid)s", {"oid": oid})) 615 return res[0][0]
616
617 - def getColumnsFromDB(self, tableName):
618 """returns a sequence of (name, type) pairs of the columsn this 619 table has in the database. 620 621 If the table is not on disk, this will raise a NotFoundError. 622 623 *** psycopg2 specific *** 624 """ 625 # _parseTableName bombs out on non-regular identifiers, hence 626 # foiling a possible SQL injection 627 _parseTableName(tableName) 628 cursor = self.connection.cursor() 629 try: 630 cursor.execute("select * from %s limit 0"%tableName) 631 return [(col.name, self._resolveTypeCode(col.type_code)) for col in 632 cursor.description] 633 finally: 634 cursor.close()
635
636 - def getRowEstimate(self, tableName):
637 """returns the size of the table in rows as estimated by the query 638 planner. 639 640 This will raise a KeyError with tableName if the table isn't known 641 to postgres. 642 """ 643 res = list(self.query( 644 "SELECT reltuples FROM pg_class WHERE oid = %(tableName)s::regclass", 645 locals())) 646 if not res: 647 raise KeyError(tableName) 648 return int(res[0][0])
649
650 - def roleExists(self, role):
651 """returns True if there role is known to the database. 652 """ 653 matches = list(self.query( 654 "SELECT usesysid FROM pg_user WHERE usename=%(role)s", 655 locals())) 656 return len(matches)!=0
657
658 - def getOIDForTable(self, tableName, schema=None):
659 """returns the current oid of tableName. 660 661 tableName may be schema qualified. If it is not, public is assumed. 662 """ 663 schema, tableName = _parseTableName(tableName, schema) 664 res = list(self.query("SELECT oid FROM pg_class WHERE" 665 " relname=%(tableName)s AND" 666 " relnamespace=(SELECT oid FROM pg_namespace WHERE nspname=%(schema)s)", 667 locals())) 668 if len(res)!=1: 669 raise Error("Table %s does not exist"%tableName) 670 return res[0][0]
671
672 - def _rowExists(self, query, pars):
673 res = list(self.query(query, pars)) 674 return len(res)!=0
675
676 - def getTableType(self, tableName, schema=None):
677 """returns the type of the relation relationName. 678 679 If relationName does not exist, None is returned. Otherwise, it's 680 what is in the information schema for the table, which for postgres 681 currently is one of BASE TABLE, VIEW, FOREIGN TABLE, MATERIALIZED VIEW, 682 or LOCAL TEMPORARY. 683 684 The DaCHS-idiomatic way to see if a relation exists is 685 getTableType() is not None. 686 687 You can pass in schema-qualified relation names, or the relation name 688 and the schema separately. 689 690 *** postgres specific *** 691 """ 692 schema, tableName = _parseTableName(tableName, schema) 693 res = list( 694 self.query("""SELECT table_name, table_type FROM 695 information_schema.tables WHERE ( 696 table_schema=%(schemaName)s 697 OR table_type='LOCAL TEMPORARY') 698 AND table_name=%(tableName)s""", { 699 'tableName': tableName.lower(), 700 'schemaName': schema.lower()})) 701 702 if not res: 703 # materialised views are not yet in information_schema.tables, 704 # so we try again with a special postgres case. 705 if list(self.connection.query( 706 "select table_name from information_schema.tables" 707 " where table_name='pg_matviews'")): 708 res = list( 709 self.query("""SELECT matviewname, 'MATERIALIZED VIEW' AS table_type 710 FROM pg_matviews 711 WHERE 712 schemaname=%(schemaName)s 713 AND matviewname=%(tableName)s""", { 714 'tableName': tableName.lower(), 715 'schemaName': schema.lower()})) 716 717 if not res: 718 return None 719 720 assert len(res)==1 721 return res[0][1]
722
723 - def dropTable(self, tableName, cascade=False):
724 """drops a table or view named by tableName. 725 726 This does not raise an error if no such relation exists. 727 728 *** postgres specific *** 729 """ 730 tableType = self.getTableType(tableName) 731 if tableType is None: 732 return 733 734 dropQualification = { 735 "VIEW": "VIEW", 736 "MATERIALIZED VIEW": "MATERIALIZED VIEW", 737 "FOREIGN TABLE": "FOREIGN TABLE", 738 "BASE TABLE": "TABLE", 739 "LOCAL TEMPORARY": "TABLE"}[tableType] 740 self.query("DROP %s %s %s"%( 741 dropQualification, 742 tableName, 743 "CASCADE" if cascade else ""))
744
745 - def getSchemaPrivileges(self, schema):
746 """returns (owner, readRoles, allRoles) for schema's ACL. 747 """ 748 res = list(self.query("SELECT nspacl FROM pg_namespace WHERE" 749 " nspname=%(schema)s", locals())) 750 return self.parsePGACL(res[0][0])
751
752 - def getTablePrivileges(self, schema, tableName):
753 """returns (owner, readRoles, allRoles) for the relation tableName 754 and the schema. 755 756 *** postgres specific *** 757 """ 758 res = list(self.query("SELECT relacl FROM pg_class WHERE" 759 " lower(relname)=lower(%(tableName)s) AND" 760 " relnamespace=(SELECT oid FROM pg_namespace WHERE nspname=%(schema)s)", 761 locals())) 762 try: 763 return self.parsePGACL(res[0][0]) 764 except IndexError: # Table doesn't exist, so no privileges 765 return {}
766 767 _privTable = { 768 "arwdRx": "ALL", 769 "arwdDxt": "ALL", 770 "arwdRxt": "ALL", 771 "arwdxt": "ALL", 772 "r": "SELECT", 773 "UC": "ALL", 774 "U": "USAGE", 775 } 776
777 - def parsePGACL(self, acl):
778 """returns a dict roleName->acl for acl in postgres' 779 ACL serialization. 780 """ 781 if acl is None: 782 return {} 783 res = [] 784 for acs in re.match("{(.*)}", acl).group(1).split(","): 785 if acs!='': # empty ACLs don't match the RE, so catch them here 786 role, privs, granter = re.match("([^=]*)=([^/]*)/(.*)", acs).groups() 787 res.append((role, self._privTable.get(privs, "READ"))) 788 return dict(res)
789
790 - def getACLFromRes(self, thingWithPrivileges):
791 """returns a dict of (role, ACL) as it is defined in thingWithPrivileges. 792 793 thingWithPrivileges is something mixing in rscdef.common.PrivilegesMixin. 794 (or has readProfiles and allProfiles attributes containing 795 sequences of profile names). 796 """ 797 res = [] 798 if hasattr(thingWithPrivileges, "schema"): # it's an RD 799 readRight = "USAGE" 800 else: 801 readRight = "SELECT" 802 803 for profile in thingWithPrivileges.readProfiles: 804 res.append((config.getDBProfile(profile).roleName, readRight)) 805 for profile in thingWithPrivileges.allProfiles: 806 res.append((config.getDBProfile(profile).roleName, "ALL")) 807 return dict(res)
808
809 810 -class StandardQueryMixin(object):
811 """is a mixin containing various useful queries that should work 812 agains all SQL systems. 813 814 This mixin expects a parent that mixes is QuerierMixin (that, for now, 815 also mixes in StandardQueryMixin, so you won't need to mix this in). 816 817 The parent also needs to mix in something like PostgresQueryMixin (I 818 might want to define an interface there once I'd like to support 819 other databases). 820 """
821 - def setSchemaPrivileges(self, rd):
822 """sets the privileges defined on rd to its schema. 823 824 This function will never touch the public schema. 825 """ 826 schema = rd.schema.lower() 827 if schema=="public": 828 return 829 self._updatePrivileges("SCHEMA %s"%schema, 830 self.getSchemaPrivileges(schema), self.getACLFromRes(rd))
831
832 - def setTablePrivileges(self, tableDef):
833 """sets the privileges defined in tableDef for that table through 834 querier. 835 """ 836 self._updatePrivileges(tableDef.getQName(), 837 self.getTablePrivileges(tableDef.rd.schema, tableDef.id), 838 self.getACLFromRes(tableDef))
839
840 - def _updatePrivileges(self, objectName, foundPrivs, shouldPrivs):
841 """is a helper for set[Table|Schema]Privileges. 842 843 Requests for granting privileges not known to the database are 844 ignored, but a log entry is generated. 845 """ 846 for role in set(foundPrivs)-set(shouldPrivs): 847 if role: 848 self.connection.execute("REVOKE ALL PRIVILEGES ON %s FROM %s"%( 849 objectName, role)) 850 for role in set(shouldPrivs)-set(foundPrivs): 851 if role: 852 if self.roleExists(role): 853 self.connection.execute( 854 "GRANT %s ON %s TO %s"%(shouldPrivs[role], objectName, role)) 855 else: 856 utils.sendUIEvent("Warning", 857 "Request to grant privileges to non-existing" 858 " database user %s dropped"%role) 859 for role in set(shouldPrivs)&set(foundPrivs): 860 if role: 861 if shouldPrivs[role]!=foundPrivs[role]: 862 self.connection.execute("REVOKE ALL PRIVILEGES ON %s FROM %s"%( 863 objectName, role)) 864 self.connection.execute("GRANT %s ON %s TO %s"%(shouldPrivs[role], objectName, 865 role))
866
867 - def setTimeout(self, timeout):
868 """wraps conn.setTimeout for backward compatibility. 869 """ 870 self.connection.setTimeout(timeout)
871
872 - def getTimeout(self):
873 """wraps conn.getTimeout for backward compatibility. 874 875 The value is in float seconds. 876 """ 877 return self.connection.getTimeout()
878
879 880 -def dictifyRowset(descr, rows):
881 # deprecated -- remove this when SimpleQuerier is gone 882 """turns a standard, tuple-based rowset into a list of dictionaries, 883 the keys of which are taken from descr (a cursor.description). 884 """ 885 keys = [cd[0] for cd in descr] 886 return [dict(zip(keys, row)) for row in rows]
887
888 889 -class QuerierMixin(PostgresQueryMixin, StandardQueryMixin):
890 """is a mixin for "queriers", i.e., objects that maintain a db connection. 891 892 The mixin assumes an attribute connection from the parent. 893 """ 894 defaultProfile = None 895 # _reconnecting is used in query 896 _reconnecting = False 897
898 - def configureConnection(self, settings):
899 cursor = self.connection.cursor() 900 for key, val in settings: 901 cursor.execute("SET %s=%%(val)s"%key, {"val": val}) 902 cursor.close()
903
904 - def enableAutocommit(self):
905 self.connection.set_isolation_level( 906 psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
907
908 - def _queryReconnecting(self, query, data, timeout):
909 """helps query in case of disconnections. 910 """ 911 self.connection = getDBConnection( 912 **self.connection._getDBConnectionArgs) 913 self._reconnecting = True 914 res = self.query(query, data, timeout) 915 self._reconnection = False 916 return res
917
918 - def query(self, query, data={}, timeout=None):
919 """wraps conn.query adding logic to re-establish lost connections. 920 921 Don't use this method any more in new code. It contains wicked 922 logic to tell DDL statements (that run without anyone pulling 923 the results) from actual selects. That's a bad API. Also note 924 that the timeout is ignored for DDL statements. 925 926 Use either connection.query or connection.execute in new code. 927 """ 928 if self.connection is None: 929 raise utils.ReportableError( 930 "SimpleQuerier connection is None.", 931 hint="This ususally is because an AdhocQuerier's query method" 932 " was used outside of a with block.") 933 934 try: 935 if query[:5].lower() in ["selec", "with "]: 936 return self.connection.query(query, data, timeout) 937 else: 938 # it's DDL that we execute directly, ignoring the timeout 939 self.connection.execute(query, data) 940 except DBError as ex: 941 if isinstance(ex, OperationalError) and self.connection.fileno()==-1: 942 if not self._reconnecting: 943 return self._queryReconnecting(query, data, timeout) 944 raise
945
946 - def queryToDicts(self, *args, **kwargs):
947 """wraps conn.queryToDicts for backwards compatilitiy. 948 """ 949 cursor = self.query(*args, **kwargs) 950 keys = [d[0] for d in cursor.description] 951 for row in cursor: 952 yield dict(zip(keys, row))
953 954 # alias for legacy code. Will probably go away soon. 955 queryDicts = queryToDicts 956
957 - def finish(self):
958 self.connection.commit() 959 self.connection.close()
960
961 - def abort(self):
962 self.connection.close()
963
964 965 -class UnmanagedQuerier(QuerierMixin):
966 """A simple interface to querying the database through a connection 967 managed by someone else. 968 969 This is typically used as in:: 970 971 with base.getTableConn() as conn: 972 q = UnmanagedQuerier(conn) 973 ... 974 975 This contains numerous methods abstracting DB functionality a bit. 976 Documented ones include: 977 978 * schemaExissts(schema) 979 * getColumnsFromDB(tableName) 980 * getTableType(tableName) -- this will return None for non-existing tables, 981 which is DaCHS' official way to determine table existence. 982 * getTimeout() -- returns the current query timeout in seconds 983 * setTimeout(timeout) -- sets a timeout in seconds. 984 """
985 - def __init__(self, connection):
986 self.connection = connection
987
988 989 -class AdhocQuerier(QuerierMixin):
990 """A simple interface to querying the database through pooled 991 connections. 992 993 These are constructed using the connection getters (getTableConn (default), 994 getAdminConn) and then serve as context managers, handing back the connection 995 as you exit the controlled block. 996 997 Since they operate through pooled connections, no transaction 998 management takes place. These are typically for read-only things. 999 1000 You can use the query method and everything that's in the QuerierMixin. 1001 """
1002 - def __init__(self, connectionManager=None):
1003 if connectionManager is None: 1004 self.connectionManager = getTableConn 1005 else: 1006 self.connectionManager = connectionManager 1007 self.connection = None
1008
1009 - def __enter__(self):
1010 self._cm = self.connectionManager() 1011 self.connection = self._cm.__enter__() 1012 return self
1013
1014 - def __exit__(self, *args):
1015 self.connection = None 1016 return self._cm.__exit__(*args)
1017
1018 1019 -def setDBMeta(conn, key, value):
1020 """adds/overwrites (key, value) in the dc.metastore table within 1021 conn. 1022 1023 conn must be an admin connection; this does not commit. 1024 1025 key must be a string, value something unicodeable. 1026 """ 1027 conn.execute( 1028 "INSERT INTO dc.metastore (key, value) VALUES (%(key)s, %(value)s)", { 1029 'key': key, 1030 'value': unicode(value)})
1031
1032 1033 -def getDBMeta(key):
1034 """returns the value for key from within dc.metastore. 1035 1036 This always returns a unicode string. Type conversions are the client's 1037 business. 1038 1039 If no value exists, this raises a KeyError. 1040 """ 1041 with getTableConn() as conn: 1042 res = list(conn.query("SELECT value FROM dc.metastore WHERE" 1043 " key=%(key)s", {"key": key})) 1044 if not res: 1045 raise KeyError(key) 1046 return res[0][0]
1047
1048 1049 1050 @contextlib.contextmanager 1051 -def connectionConfiguration(conn, isLocal=True, timeout=None, **runtimeVals):
1052 """A context manager setting and resetting runtimeVals in conn. 1053 1054 You pass just pass keyword arguments corresponding to postgres runtime 1055 configuration items (as in SET and SHOW). The manager obtains their previous 1056 values and restores them before exiting. 1057 1058 When the controlled body is terminated by a DBError, the settings 1059 are not reset. 1060 1061 If you set isLocal=False, this works for autocommitted connections, 1062 too (and in that case the reset of the run-time parameters will 1063 be attempted even when DBErrors occurred. 1064 1065 Since it's so frequent, you can pass timeout to give a statement_timeout 1066 in seconds. 1067 """ 1068 cursor = conn.cursor() 1069 1070 if timeout is not None: 1071 runtimeVals["statement_timeout"] = int(float(timeout)*1000) 1072 1073 oldVals = {} 1074 for parName, parVal in runtimeVals.iteritems(): 1075 parVal = str(parVal) 1076 cursor.execute("SELECT current_setting(%(parName)s)", locals()) 1077 oldVals[parName] = list(cursor)[0][0] 1078 cursor.execute( 1079 "SELECT set_config(%(parName)s, %(parVal)s, %(isLocal)s)", locals()) 1080 cursor.close() 1081 1082 def resetAll(isLocal): 1083 cursor = conn.cursor() 1084 for parName, parVal in oldVals.iteritems(): 1085 cursor.execute( 1086 "SELECT set_config(%(parName)s, %(parVal)s, %(isLocal)s)", 1087 locals()) 1088 cursor.close()
1089 1090 try: 1091 yield 1092 except DBError: # the connection probably is dirty, do not try to reset 1093 if not isLocal: 1094 resetAll(isLocal) 1095 raise 1096 except: 1097 resetAll(isLocal) 1098 raise 1099 resetAll(isLocal) 1100 1101 1102 JOIN_FUNCTION_BODY = """ 1103 SELECT ( 1104 ( 1105 ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,0))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,1)))) 1106 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,2))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,3)))) 1107 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,4))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,5)))) 1108 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,6))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,7)))) 1109 ) AND 1110 ( 1111 ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,0))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,1)))) 1112 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,2))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,3)))) 1113 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,4))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,5)))) 1114 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,6))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,7)))) 1115 ) 1116 ) 1117 AND q3c_sindist($1,$2,$3,$4)<POW(SIN(RADIANS($5)/2),2) 1118 ' LANGUAGE SQL IMMUTABLE; 1119 """
1120 1121 1122 -def _initPsycopg():
1123 """does any DaCHS-specific database setup necessary. 1124 1125 This will always open an admin connection. 1126 """ 1127 # collect all DB setup in this function. XXX TODO: in particular, the 1128 # Box mess from coords (if we still want it) 1129 global _PSYCOPG_INITED 1130 1131 conn = psycopg2.connect(connection_factory=GAVOConnection, 1132 **config.getDBProfile("feed").getArgs()) 1133 try: 1134 try: 1135 from gavo.utils import pgsphere 1136 pgsphere.preparePgSphere(conn) 1137 except: 1138 warnings.warn("pgsphere missing -- ADQL, pg-SIAP, and SSA will not work") 1139 1140 # Add symmetrised q3c_joins if q3c is in use and the functions are 1141 # not already defined 1142 # TODO: Delete this when q3c is fixed. 1143 funcs = set(r[0] for r in 1144 conn.query("SELECT DISTINCT proname FROM pg_proc" 1145 " WHERE proname IN ('q3c_join', 'q3c_join_symmetric')")) 1146 if funcs==frozenset(["q3c_join"]): 1147 # q3c is there, but not our extension 1148 conn.execute("""CREATE OR REPLACE FUNCTION q3c_join_symmetric( 1149 leftra double precision, leftdec double precision, 1150 rightra double precision, rightdec double precision, 1151 radius double precision) RETURNS boolean AS '"""+JOIN_FUNCTION_BODY) 1152 conn.execute("""CREATE OR REPLACE FUNCTION q3c_join_symmetric( 1153 leftra double precision, leftdec double precision, 1154 rightra real, rightdec real, 1155 radius double precision) RETURNS boolean AS '"""+JOIN_FUNCTION_BODY) 1156 finally: 1157 conn.commit() 1158 conn.close() 1159 1160 _PSYCOPG_INITED = True
1161
1162 1163 -class CustomConnectionPool(psycopg2.pool.ThreadedConnectionPool):
1164 """A threaded connection pool that returns connections made via 1165 profileName. 1166 """ 1167 # we keep weak references to pools we've created so we can invalidate 1168 # them all on a server restart to avoid having stale connections 1169 # around. 1170 knownPools = [] 1171
1172 - def __init__(self, minconn, maxconn, profileName, autocommitted=True):
1173 # make sure no additional arguments come in, since we don't 1174 # support them. 1175 self.profileName = profileName 1176 self.autocommitted = autocommitted 1177 self.stale = False 1178 psycopg2.pool.ThreadedConnectionPool.__init__( 1179 self, minconn, maxconn) 1180 self.knownPools.append(weakref.ref(self))
1181 1182 @classmethod
1183 - def serverRestarted(cls):
1184 utils.sendUIEvent("Warning", "Suspecting a database restart." 1185 " Discarding old connection pools, asking to create new ones.") 1186 1187 for pool in cls.knownPools: 1188 try: 1189 pool().stale = True 1190 except AttributeError: 1191 # already gone 1192 pass 1193 # we risk a race condition here; this is used rarely enough that this 1194 # shouldn't matter. 1195 cls.knownPools = []
1196
1197 - def _connect(self, key=None):
1198 """creates a new trustedquery connection and assigns it to 1199 key if not None. 1200 1201 This is an implementation detail of psycopg2's connection 1202 pools. 1203 """ 1204 conn = getDBConnection(self.profileName) 1205 1206 if self.autocommitted: 1207 try: 1208 conn.set_session( 1209 autocommit=True, readonly=True) 1210 except AttributeError: 1211 # fallback for old psycopg2 1212 conn.set_isolation_level( 1213 psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 1214 except ProgrammingError: 1215 utils.sendUIEvent("Warning", "Uncommitted transaction escaped; please" 1216 " investigate and fix") 1217 conn.commit() 1218 1219 1220 if key is not None: 1221 self._used[key] = conn 1222 self._rused[id(conn)] = key 1223 else: 1224 self._pool.append(conn) 1225 return conn
1226
1227 1228 -def _cleanupAfterDBError(ex, conn, pool, poolLock):
1229 """removes conn from pool after an error occurred. 1230 1231 This is a helper for getConnFromPool below. 1232 """ 1233 if isinstance(ex, OperationalError) and ex.pgcode is None: 1234 # this is probably a db server restart. Invalidate all connections 1235 # immediately. 1236 with poolLock: 1237 if pool: 1238 pool[0].serverRestarted() 1239 1240 # Make sure the connection is closed; something bad happened 1241 # in it, so we don't want to re-use it 1242 try: 1243 pool[0].putconn(conn, close=True) 1244 except InterfaceError: 1245 # Connection already closed 1246 pass 1247 except Exception as msg: 1248 utils.sendUIEvent("Error", 1249 "Disaster: %s while force-closing connection"%msg)
1250
1251 1252 -def _makeConnectionManager(profileName, minConn=5, maxConn=20, 1253 autocommitted=True):
1254 """returns a context manager for a connection pool for profileName 1255 connections. 1256 """ 1257 pool = [] 1258 poolLock = threading.Lock() 1259 1260 def makePool(): 1261 # the following check is a rough canary that hopefully will warn 1262 # now and then when a thread is being started during module import 1263 # (also, pyscopg imports are the most likely to deadlock). 1264 # Yes, this is a bit ad-hoc. 1265 if imp.lock_held(): 1266 raise OperationalError( 1267 "Attempt to make a pool with the import lock held") 1268 with poolLock: 1269 pool.append(CustomConnectionPool(minConn, maxConn, profileName, 1270 autocommitted))
1271 1272 def getConnFromPool(): 1273 # we delay pool creation since these functions are built during 1274 # sqlsupport import. We probably don't have profiles ready 1275 # at that point. 1276 if not pool: 1277 makePool() 1278 1279 if pool[0].stale: 1280 pool[0].closeall() 1281 pool.pop() 1282 makePool() 1283 1284 conn = pool[0].getconn() 1285 try: 1286 yield conn 1287 except Exception as ex: 1288 # controlled block bombed out, do error handling 1289 _cleanupAfterDBError(ex, conn, pool, poolLock) 1290 raise 1291 1292 else: 1293 # no exception raised, commit if not autocommitted 1294 if not autocommitted: 1295 conn.commit() 1296 1297 try: 1298 pool[0].putconn(conn, close=conn.closed) 1299 except InterfaceError: 1300 # Connection already closed 1301 pass 1302 1303 return contextlib.contextmanager(getConnFromPool) 1304 1305 1306 getUntrustedConn = _makeConnectionManager("untrustedquery") 1307 getTableConn = _makeConnectionManager("trustedquery") 1308 getAdminConn = _makeConnectionManager("admin") 1309 1310 getWritableUntrustedConn = _makeConnectionManager("untrustedquery", 1311 autocommitted=False) 1312 getWritableTableConn = _makeConnectionManager("trustedquery", 1313 autocommitted=False) 1314 getWritableAdminConn = _makeConnectionManager("admin", 1315 autocommitted=False) 1316