1
2 """
3 Basic support for communicating with the database server.
4
5 This is currently very postgres specific. If we really wanted to
6 support some other database, this would need massive refactoring.
7 """
8
9
10
11
12
13
14
15 from __future__ import print_function
16
17 import imp
18
19 import contextlib
20 import os
21 import random
22 import re
23 import threading
24 import warnings
25 import weakref
26
27 import numpy
28
29 from gavo import utils
30 from gavo.base import config
31
32 debug = "GAVO_SQL_DEBUG" in os.environ
33
34 import psycopg2
35 import psycopg2.extensions
36 import psycopg2.pool
37 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
38 try:
39 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
40 except AttributeError:
41 pass
42
43 from psycopg2.extras import DictCursor
44
45 -class Error(utils.Error):
47
48
49 NUMERIC_TYPES = frozenset(["smallint", "integer", "bigint", "real",
50 "double precision"])
51
52 ORDERED_TYPES = frozenset(["timestamp", "text", "unicode"]) | NUMERIC_TYPES
53
54
55 _PG_TIME_UNITS = {
56 "ms": 0.0001,
57 "s": 1.,
58 "": 1.,
59 "min": 60.,
60 "h": 3600.,
61 "d": 86400.,}
62
63
64
65
66 _PSYCOPG_INITED = False
69 """is an adapter that formats python sequences as SQL sets.
70
71 -- as opposed to psycopg2's apparent default of building arrays
72 out of them.
73 """
76
79
81 qobjs = []
82 for o in self._seq:
83 if isinstance(o, unicode):
84 qobjs.append(psycopg2.extensions.adapt(str(o)).getquoted())
85 else:
86 qobjs.append(psycopg2.extensions.adapt(o).getquoted())
87 return '(%s)'%(", ".join(qobjs))
88
89 __str__ = getquoted
90
93 """An adapter that formats python lists as SQL arrays
94
95 This makes, in the shameful tradition of VOTable, empty arrays equal to
96 NULL.
97 """
100
103
105 """adds a typecast to serializedList if it needs one.
106
107 This is when all entries in serializedList are NULL; so, we're fine
108 anyway if the first element is non-NULL; if it's not, we try to
109 guess.
110
111 serializedList is changed in place, the method returns nothing.
112 """
113 if not serializedList or serializedList[0]!="NULL":
114 return
115
116 if isinstance(self._seq, utils.floatlist):
117 serializedList[0] = "NULL::REAL"
118 elif isinstance(self._seq, utils.intlist):
119 serializedList[0] = "NULL::INTEGER"
120
121
123 if len(self._seq)==0:
124 return 'NULL'
125 qobjs = [str(psycopg2.extensions.adapt(o).getquoted())
126 for o in self._seq]
127
128 self._addCastIfNecessary(qobjs)
129
130 return 'ARRAY[ %s ]'%(", ".join(qobjs))
131
132 __str__ = getquoted
133
136 """An adapter for things that do "float", in particular numpy.float*
137 """
139 self.val = float(val)
140
143
145 if self.val!=self.val:
146 return "'nan'::real"
147 else:
148 return repr(self.val)
149
150 __str__ = getquoted
151
154 """An adapter for things that do "int", in particular numpy.int*
155 """
158
161
164
165 __str__ = getquoted
166
169 """An adapter for things that should end up as NULL in the DB.
170 """
174
177
180
181 __str__ = getquoted
182
183
184 psycopg2.extensions.register_adapter(list, SqlArrayAdapter)
185 psycopg2.extensions.register_adapter(numpy.ndarray, SqlArrayAdapter)
186 psycopg2.extensions.register_adapter(tuple, SqlSetAdapter)
187 psycopg2.extensions.register_adapter(set, SqlSetAdapter)
188 psycopg2.extensions.register_adapter(frozenset, SqlSetAdapter)
189
190 for numpyType, adapter in [
191 ("float32", FloatableAdapter),
192 ("float64", FloatableAdapter),
193 ("float96", FloatableAdapter),
194 ("int8", IntableAdapter),
195 ("int16", IntableAdapter),
196 ("int32", IntableAdapter),
197 ("int64", IntableAdapter),]:
198 try:
199 psycopg2.extensions.register_adapter(
200 getattr(numpy, numpyType), adapter)
201 except AttributeError:
202
203 pass
204
205
206 try:
207 from gavo.utils import pyfits
208 psycopg2.extensions.register_adapter(pyfits.Undefined, NULLAdapter)
209 except (ImportError, NameError):
210
211 pass
212
213 from psycopg2 import (OperationalError,
214 DatabaseError, IntegrityError, ProgrammingError,
215 InterfaceError, DataError, InternalError)
216 from psycopg2.extensions import QueryCanceledError
217 from psycopg2 import Error as DBError
221 psycopg2.extensions.register_adapter(type, adapter)
222
225 newOID = psycopg2.extensions.new_type(oid, name, castFunc)
226 psycopg2.extensions.register_type(newOID)
227
230 - def execute(self, sql, args=None):
231 print("Executing %s %s"%(id(self.connection),
232 sql.encode("ascii", "ignore")))
233 psycopg2.extensions.cursor.execute(self, sql, args)
234 print("Finished %s %s"%(id(self.connection),
235 self.query.decode("ascii", "ignore").encode("ascii", "ignore")))
236 return self.rowcount
237
239 print("Executing many", sql.encode("ascii", "ignore"))
240 print(("%d args, first one:\n%s"%(len(args), args[0])
241 ).encode("ascii", "ignore"))
242 res = psycopg2.extensions.cursor.executemany(self, sql, args)
243 print("Finished many", self.query.encode("ascii", "ignore"))
244 return res
245
248 """A psycopg2 connection with some additional methods.
249
250 This derivation is also done so we can attach the getDBConnection
251 arguments to the connection; it is used when recovering from
252 a database restart.
253 """
255 """sets a timeout on queries.
256
257 timeout is in seconds; timeout=0 disables timeouts (this is what
258 postgres does, too)
259 """
260
261 cursor = self.cursor()
262 try:
263 if timeout==-12:
264 cursor.execute("SET statement_timeout TO 1")
265 elif timeout is not None:
266 cursor.execute(
267 "SET statement_timeout TO %d"%(int(float(timeout)*1000)))
268 finally:
269 cursor.close()
270
272 """returns the current timeout setting.
273
274 The value is in float seconds.
275 """
276 cursor = self.cursor()
277 try:
278 cursor.execute("SHOW statement_timeout")
279 rawVal = list(cursor)[0][0]
280 mat = re.match("(\d+)(\w*)$", rawVal)
281 try:
282 return int(mat.group(1))*_PG_TIME_UNITS[mat.group(2)]
283 except (ValueError, AttributeError, KeyError):
284 raise ValueError("Bad timeout value from postgres: %s"%rawVal)
285 finally:
286 cursor.close()
287
288 @contextlib.contextmanager
290 """a contextmanager to have a timeout set in the controlled
291 section.
292 """
293 if timeout is not None:
294 oldTimeout = self.getTimeout()
295 self.setTimeout(timeout)
296
297 yield
298
299
300
301
302 if timeout is not None:
303 self.setTimeout(oldTimeout)
304
305 - def queryToDicts(self, query, args={}, timeout=None, caseFixer=None):
306 """iterates over dictionary rows for query.
307
308 This is mainly for ad-hoc queries needing little metadata.
309
310 The dictionary keys are determined by what the database says the
311 column titles are; thus, it's usually lower-cased variants of what's
312 in the select-list. To fix this, you can pass in a caseFixer dict
313 that gives a properly cased version of lowercase names.
314 """
315 cursor = self.cursor()
316 try:
317 with self.timeoutSet(timeout, cursor):
318 cursor.execute(query, args)
319 keys = [cd[0] for cd in cursor.description]
320 if caseFixer:
321 keys = [caseFixer.get(key, key) for key in keys]
322 for row in cursor:
323 yield dict(zip(keys, row))
324 finally:
325 cursor.close()
326
327 - def query(self, query, args={}, timeout=None):
338
339 - def execute(self, query, args={}):
350
351 @contextlib.contextmanager
353 """sets up a section protected by a savepoint that will be released
354 after use.
355
356 If an exception happens in the controlled section, the connection
357 will be rolled back to the savepoint.
358 """
359 savepointName = "auto_%s"%(random.randint(0, 2147483647))
360 self.execute("SAVEPOINT %s"%savepointName)
361 try:
362 yield
363 except:
364 self.execute("ROLLBACK TO SAVEPOINT %s"%savepointName)
365 raise
366 finally:
367 self.execute("RELEASE SAVEPOINT %s"%savepointName)
368
371 raise NotImplementedError("Don't use the savepointOn function any more;"
372 " use connection.savepoint.")
373
376 - def cursor(self, *args, **kwargs):
379
383
387
394
397 """A standin to pass whereever a function wants a connection but
398 doesn't actually need one in a particular situation.
399
400 This, in particular, concerns makeData.
401
402 To be accomodating to careful code, we'll allow commit and rollback
403 on these, but we'll raise on anything else.
404 """
406 raise ValueError("Attempt to use NullConnection (attribute %s)"%name)
407
410
413
416 """returns an enhanced database connection through profile.
417
418 You will typically rather use the context managers for the standard
419 profiles (``getTableConnection`` and friends). Use this function if
420 you want to keep your connection out of connection pools or if you want
421 to use non-standard profiles.
422
423 profile will usually be a string naming a profile defined in
424 ``GAVO_ROOT/etc``.
425 """
426 if profile is None:
427 profile = "trustedquery"
428 if isinstance(profile, basestring):
429 profile = config.getDBProfile(profile)
430
431 if debug:
432 conn = psycopg2.connect(connection_factory=DebugConnection,
433 **profile.getArgs())
434 print("NEW CONN using %s (%s)"%(profile.name, conn.getPID()), id(conn))
435 def closer():
436 print("CONNECTION CLOSE", id(conn))
437 return DebugConnection.close(conn)
438 conn.close = closer
439 else:
440 try:
441 conn = psycopg2.connect(connection_factory=GAVOConnection,
442 **profile.getArgs())
443 except OperationalError as msg:
444 raise utils.ReportableError("Cannot connect to the database server."
445 " The database library reported:\n\n%s"%msg,
446 hint="This usually means you must adapt either the access profiles"
447 " in $GAVO_DIR/etc or your database config (in particular,"
448 " pg_hba.conf).")
449
450 if not _PSYCOPG_INITED:
451 _initPsycopg()
452
453 if autocommitted:
454 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
455 conn.set_client_encoding("UTF8")
456
457 conn._getDBConnectionArgs = {
458 "profile": profile,
459 "debug": debug,
460 "autocommitted": autocommitted}
461 return conn
462
465 """returns schema, unqualified table name for the arguments.
466
467 schema=None selects the default schema (public for postgresql).
468
469 If tableName is qualified (i.e. schema.table), the schema given
470 in the name overrides the schema argument.
471
472 We do not support delimited identifiers for tables in DaCHS. Hence,
473 this will raise a ValueError if anything that wouldn't work as
474 an SQL regular identifier (except we don't filter for reserved
475 words yet, which is an implementation detail that might change).
476 """
477 parts = tableName.split(".")
478 if len(parts)>2:
479 raise ValueError("%s is not a SQL regular identifier"%repr(tableName))
480 for p in parts:
481 if not utils.identifierPattern.match(p):
482 raise ValueError("%s is not a SQL regular identifier"%repr(tableName))
483
484 if len(parts)==1:
485 name = parts[0]
486 else:
487 schema, name = parts
488
489 if schema is None:
490 schema = "public"
491
492 return schema.lower(), name.lower()
493
496 """returns digits from a postgres server banner.
497
498 This hardcodes the response given by postgres 8 and raises a ValueError
499 if the expected format is not found.
500 """
501 mat = re.match(r"PostgreSQL ([\d.]*)", bannerString)
502 if not mat:
503 raise ValueError("Cannot make out the Postgres server version from %s"%
504 repr(bannerString))
505 return ".".join(mat.group(1).split(".")[:digits])
506
507
508 -class PostgresQueryMixin(object):
509 """is a mixin containing various useful queries that are postgres specific.
510
511 This mixin expects a parent that mixes is QuerierMixin (that, for now,
512 also mixes in PostgresQueryMixin, so you won't need to mix this in).
513 """
514 - def getServerVersion(self, digits=2):
515 """returns the version of the connection's server to digits numbers.
516 """
517 bannerString = list(self.query("SELECT version()"))[0][0]
518 return parseBannerString(bannerString, digits)
519
520 - def getPrimaryIndexName(self, tableName):
521 """returns the name of the index corresponding to the primary key on
522 (the unqualified) tableName.
523 """
524 return ("%s_pkey"%tableName).lower()
525
526 - def schemaExists(self, schema):
527 """returns True if the named schema exists in the database.
528 """
529 matches = list(self.query("SELECT nspname FROM"
530 " pg_namespace WHERE nspname=%(schemaName)s", {
531 'schemaName': schema,
532 }))
533 return len(matches)!=0
534
535 - def hasIndex(self, tableName, indexName, schema=None):
536 """returns True if table tablename has and index called indexName.
537
538 See _parseTableName on the meaning of the arguments.
539 """
540 schema, tableName = _parseTableName(tableName, schema)
541 res = list(self.query("SELECT indexname FROM"
542 " pg_indexes WHERE schemaname=lower(%(schema)s) AND"
543 " tablename=lower(%(tableName)s) AND"
544 " indexname=lower(%(indexName)s)", locals()))
545 return len(list(res))>0
546
547 - def _getColIndices(self, relOID, colNames):
548 """returns a sorted tuple of column indices of colNames in the relation
549 relOID.
550
551 This really is a helper for foreignKeyExists.
552 """
553 colNames = set(n.lower() for n in colNames)
554 res = [r[0] for r in
555 self.query("SELECT attnum FROM pg_attribute WHERE"
556 " attrelid=%(relOID)s and attname IN %(colNames)s",
557 locals())]
558 res.sort()
559 return res
560
561 - def getForeignKeyName(self, srcTableName, destTableName, srcColNames,
562 destColNames, schema=None):
563 """returns True if there's a foreign key constraint on srcTable's
564 srcColNames using destTableName's destColNames.
565
566 Warning: names in XColNames that are not column names in the respective
567 tables are ignored.
568
569 This raises a ValueError if the foreign keys do not exist.
570 """
571 try:
572 srcOID = self.getOIDForTable(srcTableName, schema)
573 srcColInds = self._getColIndices(
574 srcOID, srcColNames)
575 destOID = self.getOIDForTable(destTableName, schema)
576 destColInds = self._getColIndices(
577 destOID, destColNames)
578 except Error:
579 return False
580
581 res = list(self.query("""SELECT conname FROM pg_constraint WHERE
582 contype='f'
583 AND conrelid=%(srcOID)s
584 AND confrelid=%(destOID)s
585 AND conkey=%(srcColInds)s::SMALLINT[]
586 AND confkey=%(destColInds)s::SMALLINT[]""", locals()))
587
588 if len(res)==1:
589 return res[0][0]
590 else:
591 raise ValueError("Non-existing or ambiguos foreign key")
592
593 - def foreignKeyExists(self, srcTableName, destTableName, srcColNames,
594 destColNames, schema=None):
595 try:
596 _ = self.getForeignKeyName(
597 srcTableName, destTableName,
598 srcColNames, destColNames,
599 schema)
600 return True
601 except ValueError:
602 return False
603
604 @utils.memoized
605 - def _resolveTypeCode(self, oid):
606 """returns a textual description for a type oid as returned
607 by cursor.description.
608
609 These descriptions are *not* DDL-ready. There's the
610
611 *** postgres specific ***
612 """
613 res = list(self.query(
614 "select typname from pg_type where oid=%(oid)s", {"oid": oid}))
615 return res[0][0]
616
617 - def getColumnsFromDB(self, tableName):
618 """returns a sequence of (name, type) pairs of the columsn this
619 table has in the database.
620
621 If the table is not on disk, this will raise a NotFoundError.
622
623 *** psycopg2 specific ***
624 """
625
626
627 _parseTableName(tableName)
628 cursor = self.connection.cursor()
629 try:
630 cursor.execute("select * from %s limit 0"%tableName)
631 return [(col.name, self._resolveTypeCode(col.type_code)) for col in
632 cursor.description]
633 finally:
634 cursor.close()
635
636 - def getRowEstimate(self, tableName):
637 """returns the size of the table in rows as estimated by the query
638 planner.
639
640 This will raise a KeyError with tableName if the table isn't known
641 to postgres.
642 """
643 res = list(self.query(
644 "SELECT reltuples FROM pg_class WHERE oid = %(tableName)s::regclass",
645 locals()))
646 if not res:
647 raise KeyError(tableName)
648 return int(res[0][0])
649
650 - def roleExists(self, role):
651 """returns True if there role is known to the database.
652 """
653 matches = list(self.query(
654 "SELECT usesysid FROM pg_user WHERE usename=%(role)s",
655 locals()))
656 return len(matches)!=0
657
658 - def getOIDForTable(self, tableName, schema=None):
659 """returns the current oid of tableName.
660
661 tableName may be schema qualified. If it is not, public is assumed.
662 """
663 schema, tableName = _parseTableName(tableName, schema)
664 res = list(self.query("SELECT oid FROM pg_class WHERE"
665 " relname=%(tableName)s AND"
666 " relnamespace=(SELECT oid FROM pg_namespace WHERE nspname=%(schema)s)",
667 locals()))
668 if len(res)!=1:
669 raise Error("Table %s does not exist"%tableName)
670 return res[0][0]
671
672 - def _rowExists(self, query, pars):
673 res = list(self.query(query, pars))
674 return len(res)!=0
675
676 - def getTableType(self, tableName, schema=None):
677 """returns the type of the relation relationName.
678
679 If relationName does not exist, None is returned. Otherwise, it's
680 what is in the information schema for the table, which for postgres
681 currently is one of BASE TABLE, VIEW, FOREIGN TABLE, MATERIALIZED VIEW,
682 or LOCAL TEMPORARY.
683
684 The DaCHS-idiomatic way to see if a relation exists is
685 getTableType() is not None.
686
687 You can pass in schema-qualified relation names, or the relation name
688 and the schema separately.
689
690 *** postgres specific ***
691 """
692 schema, tableName = _parseTableName(tableName, schema)
693 res = list(
694 self.query("""SELECT table_name, table_type FROM
695 information_schema.tables WHERE (
696 table_schema=%(schemaName)s
697 OR table_type='LOCAL TEMPORARY')
698 AND table_name=%(tableName)s""", {
699 'tableName': tableName.lower(),
700 'schemaName': schema.lower()}))
701
702 if not res:
703
704
705 if list(self.connection.query(
706 "select table_name from information_schema.tables"
707 " where table_name='pg_matviews'")):
708 res = list(
709 self.query("""SELECT matviewname, 'MATERIALIZED VIEW' AS table_type
710 FROM pg_matviews
711 WHERE
712 schemaname=%(schemaName)s
713 AND matviewname=%(tableName)s""", {
714 'tableName': tableName.lower(),
715 'schemaName': schema.lower()}))
716
717 if not res:
718 return None
719
720 assert len(res)==1
721 return res[0][1]
722
723 - def dropTable(self, tableName, cascade=False):
724 """drops a table or view named by tableName.
725
726 This does not raise an error if no such relation exists.
727
728 *** postgres specific ***
729 """
730 tableType = self.getTableType(tableName)
731 if tableType is None:
732 return
733
734 dropQualification = {
735 "VIEW": "VIEW",
736 "MATERIALIZED VIEW": "MATERIALIZED VIEW",
737 "FOREIGN TABLE": "FOREIGN TABLE",
738 "BASE TABLE": "TABLE",
739 "LOCAL TEMPORARY": "TABLE"}[tableType]
740 self.query("DROP %s %s %s"%(
741 dropQualification,
742 tableName,
743 "CASCADE" if cascade else ""))
744
745 - def getSchemaPrivileges(self, schema):
746 """returns (owner, readRoles, allRoles) for schema's ACL.
747 """
748 res = list(self.query("SELECT nspacl FROM pg_namespace WHERE"
749 " nspname=%(schema)s", locals()))
750 return self.parsePGACL(res[0][0])
751
752 - def getTablePrivileges(self, schema, tableName):
753 """returns (owner, readRoles, allRoles) for the relation tableName
754 and the schema.
755
756 *** postgres specific ***
757 """
758 res = list(self.query("SELECT relacl FROM pg_class WHERE"
759 " lower(relname)=lower(%(tableName)s) AND"
760 " relnamespace=(SELECT oid FROM pg_namespace WHERE nspname=%(schema)s)",
761 locals()))
762 try:
763 return self.parsePGACL(res[0][0])
764 except IndexError:
765 return {}
766
767 _privTable = {
768 "arwdRx": "ALL",
769 "arwdDxt": "ALL",
770 "arwdRxt": "ALL",
771 "arwdxt": "ALL",
772 "r": "SELECT",
773 "UC": "ALL",
774 "U": "USAGE",
775 }
776
777 - def parsePGACL(self, acl):
778 """returns a dict roleName->acl for acl in postgres'
779 ACL serialization.
780 """
781 if acl is None:
782 return {}
783 res = []
784 for acs in re.match("{(.*)}", acl).group(1).split(","):
785 if acs!='':
786 role, privs, granter = re.match("([^=]*)=([^/]*)/(.*)", acs).groups()
787 res.append((role, self._privTable.get(privs, "READ")))
788 return dict(res)
789
790 - def getACLFromRes(self, thingWithPrivileges):
791 """returns a dict of (role, ACL) as it is defined in thingWithPrivileges.
792
793 thingWithPrivileges is something mixing in rscdef.common.PrivilegesMixin.
794 (or has readProfiles and allProfiles attributes containing
795 sequences of profile names).
796 """
797 res = []
798 if hasattr(thingWithPrivileges, "schema"):
799 readRight = "USAGE"
800 else:
801 readRight = "SELECT"
802
803 for profile in thingWithPrivileges.readProfiles:
804 res.append((config.getDBProfile(profile).roleName, readRight))
805 for profile in thingWithPrivileges.allProfiles:
806 res.append((config.getDBProfile(profile).roleName, "ALL"))
807 return dict(res)
808
811 """is a mixin containing various useful queries that should work
812 agains all SQL systems.
813
814 This mixin expects a parent that mixes is QuerierMixin (that, for now,
815 also mixes in StandardQueryMixin, so you won't need to mix this in).
816
817 The parent also needs to mix in something like PostgresQueryMixin (I
818 might want to define an interface there once I'd like to support
819 other databases).
820 """
831
839
841 """is a helper for set[Table|Schema]Privileges.
842
843 Requests for granting privileges not known to the database are
844 ignored, but a log entry is generated.
845 """
846 for role in set(foundPrivs)-set(shouldPrivs):
847 if role:
848 self.connection.execute("REVOKE ALL PRIVILEGES ON %s FROM %s"%(
849 objectName, role))
850 for role in set(shouldPrivs)-set(foundPrivs):
851 if role:
852 if self.roleExists(role):
853 self.connection.execute(
854 "GRANT %s ON %s TO %s"%(shouldPrivs[role], objectName, role))
855 else:
856 utils.sendUIEvent("Warning",
857 "Request to grant privileges to non-existing"
858 " database user %s dropped"%role)
859 for role in set(shouldPrivs)&set(foundPrivs):
860 if role:
861 if shouldPrivs[role]!=foundPrivs[role]:
862 self.connection.execute("REVOKE ALL PRIVILEGES ON %s FROM %s"%(
863 objectName, role))
864 self.connection.execute("GRANT %s ON %s TO %s"%(shouldPrivs[role], objectName,
865 role))
866
871
873 """wraps conn.getTimeout for backward compatibility.
874
875 The value is in float seconds.
876 """
877 return self.connection.getTimeout()
878
881
882 """turns a standard, tuple-based rowset into a list of dictionaries,
883 the keys of which are taken from descr (a cursor.description).
884 """
885 keys = [cd[0] for cd in descr]
886 return [dict(zip(keys, row)) for row in rows]
887
888
889 -class QuerierMixin(PostgresQueryMixin, StandardQueryMixin):
890 """is a mixin for "queriers", i.e., objects that maintain a db connection.
891
892 The mixin assumes an attribute connection from the parent.
893 """
894 defaultProfile = None
895
896 _reconnecting = False
897
903
905 self.connection.set_isolation_level(
906 psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
907
917
918 - def query(self, query, data={}, timeout=None):
919 """wraps conn.query adding logic to re-establish lost connections.
920
921 Don't use this method any more in new code. It contains wicked
922 logic to tell DDL statements (that run without anyone pulling
923 the results) from actual selects. That's a bad API. Also note
924 that the timeout is ignored for DDL statements.
925
926 Use either connection.query or connection.execute in new code.
927 """
928 if self.connection is None:
929 raise utils.ReportableError(
930 "SimpleQuerier connection is None.",
931 hint="This ususally is because an AdhocQuerier's query method"
932 " was used outside of a with block.")
933
934 try:
935 if query[:5].lower() in ["selec", "with "]:
936 return self.connection.query(query, data, timeout)
937 else:
938
939 self.connection.execute(query, data)
940 except DBError as ex:
941 if isinstance(ex, OperationalError) and self.connection.fileno()==-1:
942 if not self._reconnecting:
943 return self._queryReconnecting(query, data, timeout)
944 raise
945
953
954
955 queryDicts = queryToDicts
956
960
963
966 """A simple interface to querying the database through a connection
967 managed by someone else.
968
969 This is typically used as in::
970
971 with base.getTableConn() as conn:
972 q = UnmanagedQuerier(conn)
973 ...
974
975 This contains numerous methods abstracting DB functionality a bit.
976 Documented ones include:
977
978 * schemaExissts(schema)
979 * getColumnsFromDB(tableName)
980 * getTableType(tableName) -- this will return None for non-existing tables,
981 which is DaCHS' official way to determine table existence.
982 * getTimeout() -- returns the current query timeout in seconds
983 * setTimeout(timeout) -- sets a timeout in seconds.
984 """
987
990 """A simple interface to querying the database through pooled
991 connections.
992
993 These are constructed using the connection getters (getTableConn (default),
994 getAdminConn) and then serve as context managers, handing back the connection
995 as you exit the controlled block.
996
997 Since they operate through pooled connections, no transaction
998 management takes place. These are typically for read-only things.
999
1000 You can use the query method and everything that's in the QuerierMixin.
1001 """
1002 - def __init__(self, connectionManager=None):
1003 if connectionManager is None:
1004 self.connectionManager = getTableConn
1005 else:
1006 self.connectionManager = connectionManager
1007 self.connection = None
1008
1010 self._cm = self.connectionManager()
1011 self.connection = self._cm.__enter__()
1012 return self
1013
1017
1031
1047
1048
1049
1050 @contextlib.contextmanager
1051 -def connectionConfiguration(conn, isLocal=True, timeout=None, **runtimeVals):
1052 """A context manager setting and resetting runtimeVals in conn.
1053
1054 You pass just pass keyword arguments corresponding to postgres runtime
1055 configuration items (as in SET and SHOW). The manager obtains their previous
1056 values and restores them before exiting.
1057
1058 When the controlled body is terminated by a DBError, the settings
1059 are not reset.
1060
1061 If you set isLocal=False, this works for autocommitted connections,
1062 too (and in that case the reset of the run-time parameters will
1063 be attempted even when DBErrors occurred.
1064
1065 Since it's so frequent, you can pass timeout to give a statement_timeout
1066 in seconds.
1067 """
1068 cursor = conn.cursor()
1069
1070 if timeout is not None:
1071 runtimeVals["statement_timeout"] = int(float(timeout)*1000)
1072
1073 oldVals = {}
1074 for parName, parVal in runtimeVals.iteritems():
1075 parVal = str(parVal)
1076 cursor.execute("SELECT current_setting(%(parName)s)", locals())
1077 oldVals[parName] = list(cursor)[0][0]
1078 cursor.execute(
1079 "SELECT set_config(%(parName)s, %(parVal)s, %(isLocal)s)", locals())
1080 cursor.close()
1081
1082 def resetAll(isLocal):
1083 cursor = conn.cursor()
1084 for parName, parVal in oldVals.iteritems():
1085 cursor.execute(
1086 "SELECT set_config(%(parName)s, %(parVal)s, %(isLocal)s)",
1087 locals())
1088 cursor.close()
1089
1090 try:
1091 yield
1092 except DBError:
1093 if not isLocal:
1094 resetAll(isLocal)
1095 raise
1096 except:
1097 resetAll(isLocal)
1098 raise
1099 resetAll(isLocal)
1100
1101
1102 JOIN_FUNCTION_BODY = """
1103 SELECT (
1104 (
1105 ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,0))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,1))))
1106 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,2))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,3))))
1107 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,4))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,5))))
1108 OR ((q3c_ang2ipix($3,$4)>=(q3c_nearby_it($1,$2,$5,6))) AND (q3c_ang2ipix($3,$4)<=(q3c_nearby_it($1,$2,$5,7))))
1109 ) AND
1110 (
1111 ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,0))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,1))))
1112 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,2))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,3))))
1113 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,4))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,5))))
1114 OR ((q3c_ang2ipix($1,$2)>=(q3c_nearby_it($3,$4,$5,6))) AND (q3c_ang2ipix($1,$2)<=(q3c_nearby_it($3,$4,$5,7))))
1115 )
1116 )
1117 AND q3c_sindist($1,$2,$3,$4)<POW(SIN(RADIANS($5)/2),2)
1118 ' LANGUAGE SQL IMMUTABLE;
1119 """
1123 """does any DaCHS-specific database setup necessary.
1124
1125 This will always open an admin connection.
1126 """
1127
1128
1129 global _PSYCOPG_INITED
1130
1131 conn = psycopg2.connect(connection_factory=GAVOConnection,
1132 **config.getDBProfile("feed").getArgs())
1133 try:
1134 try:
1135 from gavo.utils import pgsphere
1136 pgsphere.preparePgSphere(conn)
1137 except:
1138 warnings.warn("pgsphere missing -- ADQL, pg-SIAP, and SSA will not work")
1139
1140
1141
1142
1143 funcs = set(r[0] for r in
1144 conn.query("SELECT DISTINCT proname FROM pg_proc"
1145 " WHERE proname IN ('q3c_join', 'q3c_join_symmetric')"))
1146 if funcs==frozenset(["q3c_join"]):
1147
1148 conn.execute("""CREATE OR REPLACE FUNCTION q3c_join_symmetric(
1149 leftra double precision, leftdec double precision,
1150 rightra double precision, rightdec double precision,
1151 radius double precision) RETURNS boolean AS '"""+JOIN_FUNCTION_BODY)
1152 conn.execute("""CREATE OR REPLACE FUNCTION q3c_join_symmetric(
1153 leftra double precision, leftdec double precision,
1154 rightra real, rightdec real,
1155 radius double precision) RETURNS boolean AS '"""+JOIN_FUNCTION_BODY)
1156 finally:
1157 conn.commit()
1158 conn.close()
1159
1160 _PSYCOPG_INITED = True
1161
1164 """A threaded connection pool that returns connections made via
1165 profileName.
1166 """
1167
1168
1169
1170 knownPools = []
1171
1172 - def __init__(self, minconn, maxconn, profileName, autocommitted=True):
1173
1174
1175 self.profileName = profileName
1176 self.autocommitted = autocommitted
1177 self.stale = False
1178 psycopg2.pool.ThreadedConnectionPool.__init__(
1179 self, minconn, maxconn)
1180 self.knownPools.append(weakref.ref(self))
1181
1182 @classmethod
1184 utils.sendUIEvent("Warning", "Suspecting a database restart."
1185 " Discarding old connection pools, asking to create new ones.")
1186
1187 for pool in cls.knownPools:
1188 try:
1189 pool().stale = True
1190 except AttributeError:
1191
1192 pass
1193
1194
1195 cls.knownPools = []
1196
1198 """creates a new trustedquery connection and assigns it to
1199 key if not None.
1200
1201 This is an implementation detail of psycopg2's connection
1202 pools.
1203 """
1204 conn = getDBConnection(self.profileName)
1205
1206 if self.autocommitted:
1207 try:
1208 conn.set_session(
1209 autocommit=True, readonly=True)
1210 except AttributeError:
1211
1212 conn.set_isolation_level(
1213 psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
1214 except ProgrammingError:
1215 utils.sendUIEvent("Warning", "Uncommitted transaction escaped; please"
1216 " investigate and fix")
1217 conn.commit()
1218
1219
1220 if key is not None:
1221 self._used[key] = conn
1222 self._rused[id(conn)] = key
1223 else:
1224 self._pool.append(conn)
1225 return conn
1226
1229 """removes conn from pool after an error occurred.
1230
1231 This is a helper for getConnFromPool below.
1232 """
1233 if isinstance(ex, OperationalError) and ex.pgcode is None:
1234
1235
1236 with poolLock:
1237 if pool:
1238 pool[0].serverRestarted()
1239
1240
1241
1242 try:
1243 pool[0].putconn(conn, close=True)
1244 except InterfaceError:
1245
1246 pass
1247 except Exception as msg:
1248 utils.sendUIEvent("Error",
1249 "Disaster: %s while force-closing connection"%msg)
1250
1254 """returns a context manager for a connection pool for profileName
1255 connections.
1256 """
1257 pool = []
1258 poolLock = threading.Lock()
1259
1260 def makePool():
1261
1262
1263
1264
1265 if imp.lock_held():
1266 raise OperationalError(
1267 "Attempt to make a pool with the import lock held")
1268 with poolLock:
1269 pool.append(CustomConnectionPool(minConn, maxConn, profileName,
1270 autocommitted))
1271
1272 def getConnFromPool():
1273
1274
1275
1276 if not pool:
1277 makePool()
1278
1279 if pool[0].stale:
1280 pool[0].closeall()
1281 pool.pop()
1282 makePool()
1283
1284 conn = pool[0].getconn()
1285 try:
1286 yield conn
1287 except Exception as ex:
1288
1289 _cleanupAfterDBError(ex, conn, pool, poolLock)
1290 raise
1291
1292 else:
1293
1294 if not autocommitted:
1295 conn.commit()
1296
1297 try:
1298 pool[0].putconn(conn, close=conn.closed)
1299 except InterfaceError:
1300
1301 pass
1302
1303 return contextlib.contextmanager(getConnFromPool)
1304
1305
1306 getUntrustedConn = _makeConnectionManager("untrustedquery")
1307 getTableConn = _makeConnectionManager("trustedquery")
1308 getAdminConn = _makeConnectionManager("admin")
1309
1310 getWritableUntrustedConn = _makeConnectionManager("untrustedquery",
1311 autocommitted=False)
1312 getWritableTableConn = _makeConnectionManager("trustedquery",
1313 autocommitted=False)
1314 getWritableAdminConn = _makeConnectionManager("admin",
1315 autocommitted=False)
1316