Package gavo :: Package protocols :: Module adqlglue
[frames] | no frames]

Source Code for Module gavo.protocols.adqlglue

  1  """ 
  2  Code to bind the adql library to the data center software. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import sys 
 12   
 13   
 14  from gavo import adql 
 15  from gavo import base 
 16  from gavo import rsc 
 17  from gavo import rscdef 
 18  from gavo import svcs 
 19  from gavo import utils 
 20   
 21   
22 -def makeFieldInfo(column, sqlName=None):
23 """returns an adql.tree.FieldInfo object from a rscdef.Column. 24 """ 25 return adql.FieldInfo(column.type, 26 column.unit, column.ucd, (column,), stc=column.stc, sqlName=sqlName)
27 28
29 -class TDContext(object):
30 """An object keeping track of the generation of a table definition 31 for ADQL output. 32 """
33 - def __init__(self):
34 self.existingNames = set()
35
36 - def getName(self, desiredName):
37 while desiredName in self.existingNames: 38 desiredName = desiredName+"_" 39 self.existingNames.add(desiredName) 40 return desiredName
41 42 43 # For columns of types that have no automatic VOTable null value, 44 # we make up some when we don't have any yet. This is governed by 45 # the following dictionary. 46 # All this is in particular for columns that came into being from 47 # expressions. 48 # 49 # This largely follows what Mark Taylor does in topcat. 50 _artificialNULLs = { 51 "bytea": "255", 52 "smallint": "-32768", 53 "integer": "-2147483648", 54 "bigint": "-9223372036854775808", 55 } 56
57 -def _makeColumnFromFieldInfo(ctx, colName, fi):
58 """constructs a rscdef.Column from a field info pair as left by the 59 ADQL machinery. 60 61 The strategy: If there's only one userData, we copy the Column 62 contained in there, update the unit and the ucd, plus a warning 63 if the Column has been tainted. 64 65 If there's more or less than one userData, we create a new 66 Column, use the data provided by fi and make up a description 67 consisting of the source descriptions. Add a taint warning 68 if necessary. 69 70 Since we cannot assign sensible verbLevels and assume the user wants 71 to see what s/he selected, all fields get verbLevel 1. 72 73 Types are a serious problem, handled by typesystems. 74 """ 75 if len(fi.userData)==1: 76 res = svcs.OutputField.fromColumn(fi.userData[0]) 77 # the following is to undo column renaming for postgres-forbidden 78 # column names 79 if hasattr(fi.userData[0], "originalName"): 80 colName = fi.userData[0].originalName 81 else: 82 res = base.makeStruct(svcs.OutputField, name=colName) 83 res.name = ctx.getName(colName) 84 res.ucd = fi.ucd 85 res.unit = fi.unit 86 res.type = fi.type 87 88 # XXX TODO: do something with stc's "broken" attribute 89 res.stc = fi.stc 90 91 if len(fi.userData)>1: 92 res.description = ("This field has traces of: %s"%("; ".join([ 93 f.description for f in fi.userData if f.description]))) 94 95 if fi.tainted: 96 res.description = (res.description+" -- *TAINTED*: the value" 97 " was operated on in a way that unit and ucd may be severely wrong") 98 99 # The xtype may be set by the node classes; this is used downstream 100 # to transform to STC-S strings. 101 if "xtype" in fi.properties: 102 res.xtype = fi.properties["xtype"] 103 res.needMunging = True 104 105 # dates and timestamps should be ISO format for TAP or consistency with it 106 if res.type=="date" or res.type=="timestamp": 107 res.xtype = "timestamp" 108 109 # integral types must have a null value set since we can't be 110 # sure that a query yields defined results for all of them. 111 # Tough luck if our artificial value is already taken by the table 112 # (remedy: select a suitable null value in the column metadata) 113 if (res.type in _artificialNULLs 114 and ( 115 not (res.values and res.values.nullLiteral) 116 or fi.tainted)): 117 nullLiteral = _artificialNULLs[res.type] 118 if res.values: 119 res.feedObject("values", res.values.change(nullLiteral=nullLiteral)) 120 else: 121 res.feedObject("values", base.makeStruct(rscdef.Values, 122 nullLiteral=nullLiteral)) 123 124 res.verbLevel = 1 125 res.finishElement() 126 return res
127 128
129 -def _getTableDescForOutput(parsedTree):
130 """returns a sequence of Column instances describing the output of the 131 parsed and annotated ADQL query parsedTree. 132 """ 133 ctx = TDContext() 134 columns = [_makeColumnFromFieldInfo(ctx, *fi) 135 for fi in parsedTree.fieldInfos.seq] 136 137 # if this is a simple one-table query, take the metadata and params 138 # from that table. 139 fromNames = [t.qName 140 for t in parsedTree.fromClause.getAllTables() 141 if hasattr(t, "qName")] 142 143 if len(fromNames)==1: 144 try: 145 srcTable = base.caches.getMTH(None).getTableDefForTable(fromNames[0]) 146 # swallow groups for now -- we don't really use them for db tables 147 # but if there are some, they'll be trouble when columns are missing. 148 resTable = srcTable.change(columns=columns, groups=[], primary=()) 149 resTable.copyMetaFrom(srcTable) 150 resTable.id = srcTable.id 151 return resTable 152 except base.NotFoundError: 153 # Single source is not one of our tables, hence no metadata, and 154 # fall through to normal table generation 155 pass 156 157 resTable = base.makeStruct(rscdef.TableDef, columns=columns, 158 id=parsedTree.suggestAName()) 159 160 return resTable
161 162
163 -def _getSchema(tableName):
164 # tableName is a nodes.TableName instance 165 return tableName.schema or ""
166 167
168 -def _getADQLName(col):
169 """returns the name a column is known as within the ADQL query. 170 171 This can be different from the actual column name for uploaded 172 tables, where we have to rename columns called oid, tableoid,... 173 174 On the SQL side, our internal name is being used. 175 """ 176 return getattr(col, "originalName", col.name)
177 178
179 -def adqlTableRefToDaCHS(tableName):
180 """returns a DaCHS-internal table name suitable for dc.tablemeta for 181 an ADQL TableName node. 182 183 In particular, in DaCHS we don't support catalog, so that errors 184 out immediately. Also, we don't support delimited table identifiers. 185 Anything delimited not consisting exclusively of lower case letters 186 must therefore fail immediately. When they're all lowercase, people 187 engaged in gratuitous quoting. Then, just unquote and move on. 188 """ 189 if isinstance(tableName, basestring): 190 return tableName 191 192 surfaceForm = adql.flatten(tableName) 193 if tableName.cat: 194 raise base.NotFoundError(surfaceForm, "table", "published tables", 195 hint="DaCHS services have no tables with catalog parts.") 196 197 if isinstance(tableName.schema, utils.QuotedName): 198 if not tableName.schema.isRegularLower(): 199 raise base.NotFoundError(surfaceForm, "table", "published tables", 200 hint="You probably should not quote the table schema") 201 schema = tableName.schema.name+"." 202 else: 203 if tableName.schema: 204 schema = tableName.schema+"." 205 else: 206 schema = "" 207 208 if isinstance(tableName.name, utils.QuotedName): 209 if not tableName.name.isRegularLower(): 210 raise base.NotFoundError(surfaceForm, "table", "published tables", 211 hint="You probably should not quote the table name") 212 else: 213 name = tableName.name.name 214 else: 215 name = tableName.name 216 217 return schema+name
218 219
220 -class DaCHSFieldInfoGetter(adql.FieldInfoGetter):
221 - def __init__(self, accessProfile=None, tdsForUploads=[]):
222 adql.FieldInfoGetter.__init__(self) 223 self.mth = base.caches.getMTH(None) 224 for td in tdsForUploads: 225 self.addExtraFieldInfos( 226 td.id, 227 [(_getADQLName(f), makeFieldInfo(f, sqlName=f.name)) for f in td])
228
229 - def getInfosFor(self, tableName):
230 td = self.mth.getTableDefForTable( 231 adqlTableRefToDaCHS(tableName)) 232 return [(_getADQLName(f), makeFieldInfo(f)) for f in td if not f.hidden]
233 234
235 -def _addTableMeta(query, tree, table):
236 """adds various info items from query and its parsed tree to a 237 result table. 238 """ 239 table.makeOriginal("info") 240 table.addMeta("info", "", infoName="server", 241 infoValue=base.getConfig("web", "serverURL")) 242 table.addMeta("info", "", infoName="query", infoValue=query) 243 244 mth = base.caches.getMTH(None) 245 sourceTables = tree.getContributingNames() 246 # for 1-table queries, we've already copied the entire table metadata. 247 # don't re-copy it. 248 tableMetaCopied = len(sourceTables)==1 249 250 tablesSeen = set() 251 for tableName in sourceTables: 252 if tableName in tablesSeen: 253 continue 254 tablesSeen.add(tableName) 255 256 try: 257 sourceTD = mth.getTableDefForTable(tableName) 258 table.addMeta("info", 259 base.getMetaText(sourceTD.rd, "description", ""), 260 infoName="src_res", 261 infoValue="Contains traces from resource %s"%(sourceTD.rd.sourceId)) 262 for m in sourceTD.iterMeta("copyright", propagate=True): 263 table.addMeta("info", 264 m.getContent("text"), 265 infoName="copyright", 266 infoValue="%s copyright or license"%(sourceTD.rd.sourceId)) 267 table.addMeta("info", 268 base.getMetaText(sourceTD, "description", "", propagate=False), 269 infoName="src_table", 270 infoValue="Contains traces from table %s"%( 271 sourceTD.getQName())) 272 273 if not tableMetaCopied: 274 for m in sourceTD.iterMeta("howtociteLink"): 275 table.addMeta("howtociteLink", m) 276 for m in sourceTD.iterMeta("source"): 277 table.addMeta("source", m) 278 279 for m in sourceTD.iterMeta("_associatedDatalinkService"): 280 idColumn = sourceTD.getColumnByName( 281 m.getMeta("idColumn").getContent()) 282 # ideally, look for column(s) that were built from idColumn 283 # and are untainted (cave: join using) 284 # for now, let's just fake it: 285 try: 286 table.addMeta("_associatedDatalinkService", None) 287 destCol = table.tableDef.getColumnByName(idColumn.name.lower()) 288 table.addMeta("_associatedDatalinkService.idColumn", 289 destCol.name) 290 serviceId = m.getMeta("serviceId").getContent() 291 if "#" not in serviceId: 292 serviceId = "%s#%s"%(sourceTD.rd.sourceId, serviceId) 293 table.addMeta("_associatedDatalinkService.serviceId", 294 serviceId) 295 except base.NotFoundError: 296 # User hasn't selected the column with the id. No problem. 297 pass 298 except base.Error: 299 # don't fail just because of funny metadata or tables not found 300 pass
301 302
303 -def _updateMatchLimits(tree, maxrec, hardLimit):
304 """instruments the ADQL tree for the user row set limit maxrec 305 and the system row set limit hard limit. 306 307 maxrec is a match limit from the protocol level, as opposed to the 308 setLimit from the ADQL TOP clause. The rules of interaction between 309 the two are documented inline below (it's messy). 310 311 This returns the overflow set limit. If exactly this many rows 312 are returned from he query, and overflow indicator should be set. 313 """ 314 tree.overflowLimit = None 315 # First, fill in system defaults and make sure maxrec doesn't 316 # exceed the caller's or the system's hard limits. 317 if hardLimit is None: 318 hardLimit = base.getConfig("async", "hardMAXREC") 319 320 if maxrec is None: 321 maxrec = base.getConfig("async", "defaultMAXREC") 322 323 maxrec = min(maxrec, hardLimit) 324 325 if not tree.setLimit: 326 # If no set limit has been passed in, put in maxrec and order 327 # overflow indicators starting there. 328 tree.setLimit = maxrec 329 return maxrec 330 331 elif maxrec>tree.setLimit: 332 # If the set limit passed in is not larger than maxrec, there's 333 # nothing we need to do, because we can never overflow 334 return maxrec 335 336 elif maxrec==tree.setLimit: 337 # Special (but probably non-negligible) case: maxrec==set limit 338 # we don't want to trigger an alarm and not touch the set limit either 339 return maxrec+1 340 341 else: 342 # We have both maxrec and TOP, and maxrec<=TOP. Set TOP to maxrec+1 343 # and instruct to report overflows with maxrec+1 rows. Let's hope 344 # no one will mind an extra row here and there. 345 tree.setLimit = maxrec+1 346 return maxrec+1
347 348
349 -def morphADQL(query, metaProfile=None, tdsForUploads=[], 350 maxrec=None, hardLimit=None):
351 """returns an postgres query and an (empty) result table for the 352 ADQL in query. 353 354 For an explanation of maxrec and hardLimit, as well as the 355 additional table.tableDef.overflowLimit attribute on the returned table, 356 see _updateMatchLimits above; this will always be an integer. 357 """ 358 ctx, t = adql.parseAnnotating(query, 359 DaCHSFieldInfoGetter(metaProfile, tdsForUploads)) 360 361 table = rsc.TableForDef(_getTableDescForOutput(t)) 362 table.tableDef.overflowLimit = _updateMatchLimits(t, maxrec, hardLimit) 363 if hardLimit and int(t.setLimit)>hardLimit: 364 table.addMeta("_warning", "This service has a hard row limit" 365 " of %s. Your row limit was decreased to this value."%hardLimit) 366 t.setLimit = str(hardLimit) 367 368 morphStatus, morphedTree = adql.morphPG(t) 369 for warning in morphStatus.warnings: 370 table.addMeta("_warning", warning) 371 372 # escape % to hide them form dbapi replacing 373 query = adql.flatten(morphedTree).replace("%", "%%") 374 375 _addTableMeta(query, t, table) 376 377 return query, table
378 379
380 -def query(querier, query, timeout=15, metaProfile=None, tdsForUploads=[], 381 externalLimit=None, hardLimit=None):
382 """returns a DataSet for query (a string containing ADQL). 383 384 This will set timeouts and other things for the connection in 385 question. You should have one allocated especially for this query. 386 """ 387 query, table = morphADQL(query, metaProfile, tdsForUploads, externalLimit, 388 hardLimit=hardLimit) 389 addTuple = table.addTuple 390 oldTimeout = querier.getTimeout() 391 querier.setTimeout(timeout) 392 # XXX Hack: this is a lousy fix for postgres' seqscan love with 393 # limit. See if we still want this with newer postgres... 394 querier.configureConnection([("enable_seqscan", False)]) 395 396 for tuple in querier.query(query): 397 addTuple(tuple) 398 querier.setTimeout(oldTimeout) 399 400 if len(table)==table.tableDef.overflowLimit: 401 table.addMeta("_warning", "Query result probably incomplete due" 402 " to the match limit kicking in. Queries not providing a TOP" 403 " clause will be furnished with an automatic TOP %s by the machinery," 404 " so adding a TOP clause with a higher number may help."% 405 base.getConfig("adql", "webDefaultLimit")) 406 return table
407 408
409 -def mapADQLErrors(excType, excValue, excTb):
410 if (isinstance(excValue, adql.ParseException) 411 or isinstance(excValue, adql.ParseSyntaxException)): 412 raise base.ui.logOldExc( 413 base.ValidationError("Could not parse your query: %s"% 414 unicode(excValue), "query")) 415 elif isinstance(excValue, adql.ColumnNotFound): 416 raise base.ui.logOldExc(base.ValidationError("No such field known: %s"% 417 unicode(excValue), "query")) 418 elif isinstance(excValue, adql.AmbiguousColumn): 419 raise base.ui.logOldExc(base.ValidationError("%s needs to be qualified."% 420 unicode(excValue), "query")) 421 elif isinstance(excValue, adql.Error): 422 raise base.ui.logOldExc(base.ValidationError(unicode(excValue), "query")) 423 else: 424 svcs.mapDBErrors(excType, excValue, excTb)
425 426
427 -class ADQLCore(svcs.Core, base.RestrictionMixin):
428 """A core taking an ADQL query from its query argument and returning the 429 result of that query in a standard table. 430 431 Since the columns returned depend on the query, the outputTable of an 432 ADQL core must not be defined. 433 """ 434 name_ = "adqlCore" 435
436 - def wantsTableWidget(self):
437 return True
438
439 - def run(self, service, inputTable, queryMeta):
440 inRow = inputTable.getParamDict() 441 queryString = inRow["query"] 442 base.ui.notifyInfo("Incoming ADQL query: %s"%queryString) 443 try: 444 with base.AdhocQuerier(base.getUntrustedConn) as querier: 445 res = query(querier, queryString, 446 timeout=queryMeta["timeout"], hardLimit=100000, 447 externalLimit=queryMeta["dbLimit"]) 448 # XXX Warning: We're returning the db connection to the connection 449 # pool here while we still have a named cursor on it. This is 450 # risky because someone might fuzz with our connection later. 451 # However, postponing the return of the connection isn't nice 452 # either because then the renderer would have to manage the core's 453 # connections, which is ugly, too. 454 # I'm a bit at a loss for a good solution here. Let's see how 455 # well the "don't care" scheme works out. Maybe we need a "renderer closes 456 # connection" plan for this kind of streaming? 457 res.noPostprocess = True 458 queryMeta["Matched"] = len(res.rows) 459 return res 460 except: 461 mapADQLErrors(*sys.exc_info())
462 463 464 465 ################ region makers (maybe put these in a separate module later) 466 # The region maker should in general either call the parser with an ADQL 467 # fragment (see _makeSimbadRegion) or return a complete FieldInfoedNode 468 # including any info required with a node type of psqlLiteral (for 469 # postgres, let's see what happens if we want to support other DBs). 470 # 471 # There are no guarantees that we won't parse out more symbols later, 472 # and hardcoded trees would break then. 473 474 import re 475
476 -def _getRegionId(regionSpec, pat=re.compile("[A-Za-z_]+")):
477 mat = pat.match(regionSpec) 478 if mat: 479 return mat.group()
480 481 482 ################### local query interface ######################### 483
484 -def localquery():
485 """run the argument as an ADQL query. 486 """ 487 from gavo import rscdesc #noflake: cache registration 488 from gavo import formats 489 490 q = sys.argv[1] 491 with base.AdhocQuerier() as querier: 492 table = query(querier, q, timeout=1000) 493 formats.formatData("votable", table, sys.stdout)
494