Source code for gavo.protocols.simbadinterface

"""
A caching proxy for CDS' Simbad object resolver.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import json
import os
import socket
from urllib import request, parse

if __name__=="__main__":
	# see below on why this doesn't have normal unit tests.
	os.environ["GAVO_OOTTEST"] = "dontcare"
	from gavo.helpers import testhelpers

from gavo import base
from gavo.utils import ElementTree



[docs]class ObjectCache(object): """a cache for simbad queries kept in dc.metastore. This used to be file-based, and used different keys for different purposes. The different keys didn't seem to be useful, so they're ignored now. This only caches positive responses; there's too much that can go wrong when caching negatives, and the expectation is that negatives are so varying that there's little to win anyway. The values passed in are json-encoded (for simbad, these are dictionaries). """
[docs] def addItem(self, key, value): """adds an item to the cache. value is json-encoded before writing it. """ with base.getWritableAdminConn() as conn: base.setDBMeta(conn, 'simbad:'+key, json.dumps(value))
[docs] def getItem(self, key): """returns a previously stored object of key. This raises a KeyError if nothing has been stored before. """ return json.loads(base.getDBMeta('simbad:'+key))
[docs]class Sesame(object): """is a simple interface to the simbad name resolver. """ # we're using several simbad mirrors if we have to, and only give # up if all of them fail. svc_urls = [ "http://cdsweb.u-strasbg.fr/cgi-bin/nph-sesame/-ox/SN?", "http://vizier.cfa.harvard.edu/viz-bin/nph-sesame/-ox/SN?"] def __init__(self): self.cache = ObjectCache() def _parseXML(self, simbadXML): try: et = ElementTree.fromstring(simbadXML) except Exception as msg: # simbad returned weird XML base.ui.notifyWarning("Bad XML from simbad (%s)"%str(msg)) return None res = {} nameMatch = et.find("Target/name") if nameMatch is None: # no such object, return a negative return None res["oname"] = nameMatch.text firstResponse = et.find("Target/Resolver") if not firstResponse: return None res["otype"] = getattr(firstResponse.find("otype"), "text", None) try: res["RA"] = float(firstResponse.find("jradeg").text) res["dec"] = float(firstResponse.find("jdedeg").text) except (ValueError, AttributeError): # presumably null position return None return res
[docs] def query(self, ident): try: return self.cache.getItem(ident) except KeyError: # cache miss, fall through to actually querying sesame pass for svc_url in self.svc_urls: try: with request.urlopen(svc_url+parse.quote(ident), timeout=2) as f: newOb = self._parseXML(f.read()) self.cache.addItem(ident, newOb) return newOb except socket.error: # Try next mirror pass else: # all mirrors fail raise base.ui.logOldExc(base.ValidationError( "Simbad is offline, cannot query.", "hscs_pos", # really, this should be added by the widget hint="If this problem persists, complain to us rather than simbad."))
[docs] def getPositionFor(self, identifier): rec = self.query(identifier) if not rec: raise KeyError(identifier) return float(rec["RA"]), float(rec["dec"])
[docs]def getSimbadPositions(identifier): """returns ra and dec from Simbad for identifier. It raises a KeyError if Simbad doesn't know identifier. """ return base.caches.getSesame("").getPositionFor(identifier)
# This used to accept a "key" to separate different uses of Sesame. # That's not turned out to be useful, so we're now ignoring the # key. base.caches.makeCache("getSesame", lambda key="ignored": Sesame()) ############## ADQL ufunc from gavo import adql @adql.userFunction("ivo_simbadpoint", "(identifier TEXT) -> POINT", """ gavo_simbadpoint queries simbad for an identifier and returns the corresponding point. Note that identifier can only be a literal, i.e., as simple string rather than a column name. This is because our database cannot query simbad, and we probably wouldn't want to fire off millions of simbad queries anyway; use simbad's own TAP service for this kind of application. """, "point", ucd="pos.eq;src", additionalNames=["gavo_simbadpoint"]) def _simbadpoint(args): from gavo.adql import nodes if len(args)!=1 or args[0].type!="characterStringLiteral": raise adql.UfuncError( "gavo_simbadpoint takes exactly one string literal as argument") object = args[0].value resolver = base.caches.getSesame("") try: alpha, delta = resolver.getPositionFor(object) except KeyError: raise adql.UfuncError("No simbad position for '%s'"%object) raise nodes.ReplaceNode(nodes.Point(cooSys=None, x=nodes.Factor([repr(alpha)]), y=nodes.Factor([repr(delta)]))) def _getTestSuite(): import unittest with base.getWritableAdminConn() as conn: conn.execute("DELETE FROM dc.metastore WHERE key LIKE 'simbad:%%'") sc = base.caches.getSesame("anything") # NOTE: all these tests assume the cache has been cleared before # them, and that the configured mirrors are up. # Cache clearing happens a few lines up. class QueryTest(testhelpers.VerboseTest): def testBasic(self): res = getSimbadPositions("Antares") self.assertAlmostEqual(res[0], 247.351915, 5) self.assertAlmostEqual(res[1], -26.432002, 5) def testCaching(self): res = getSimbadPositions("M31") self.assertAlmostEqual(res[0], 10.684708, 5) self.assertAlmostEqual(res[1], 41.26875, 5) tmp = Sesame.svc_urls Sesame.svcs_urls = [] try: res = getSimbadPositions("M31") self.assertAlmostEqual(res[0], 10.684708, 5) self.assertAlmostEqual(res[1], 41.26875, 5) finally: Sesame.svcs_urls = tmp def testMirrorFailover(self): tmp = Sesame.svc_urls[0] Sesame.svc_urls[0] = "http://localhost:39293?" try: res = getSimbadPositions("epsilon Eri") self.assertAlmostEqual(res[0], 53.232687, 5) self.assertAlmostEqual(res[1], -9.458258, 5) finally: Sesame.svc_urls[0] = tmp def testCacheInstallation(self): res = base.caches.getSesame("anything").getPositionFor("ε Eri") self.assertAlmostEqual(res[0], 53.232687, 5) self.assertAlmostEqual(res[1], -9.458258, 5) l = locals() tests = [l[name] for name in l if isinstance(l[name], type) and issubclass(l[name], unittest.TestCase)] loader = unittest.TestLoader() suite = unittest.TestSuite([loader.loadTestsFromTestCase(t) for t in tests]) return suite if __name__=="__main__": # we don't want to test this as part of the normal unit tests, as # there's little to sensibly test without a live network connection # (and we don't want to require that for the unit tests). import unittest suite = _getTestSuite() unittest.TextTestRunner().run(suite)