Source code for gavo.helpers.testhelpers

# -*- coding: utf-8 -*-
"""
Helper classes for the DaCHS' unit tests.

WARNING: This messes up some global state.  DO NOT import into modules
doing regular work.  testtricks is the module for that kind for stuff.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import http.server
import contextlib
import gc
import io
import importlib
import os
import pickle
import re
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings

try:
	import coverage
except ImportError:
	# we only need this if someone set COVERAGE_FILE; don't do that
	# if you don't have coverage.py installed.
	pass


# This sets up a test environment of the DaCHS software.
#
# To make this work, the current user must be allowed to run
# createdb (in practice, you should have done something like
#
# sudo -u postgres -s `id -nu`
#
# ). You should be able to tear both ~/_gavo_test and the database
# down, and this should automatically recreate everything.  That's
# an integration test for DaCHS, too.
#
# This must be run before anything else from gavo is imported because
# it manipulates the config stuff; this, in turn, runs as soon as
# base is imported.

# The following forces tests to be run from the tests directory.
# Reasonable, I'd say.
#
# All the custom setup can be suppressed by setting a GAVO_OOTTEST
# env var before importing this.  That's for "out of tree test"
# and is used by the relational registry "unit" tests (and possibly
# others later).

def ensureResources():
	# overridden below if necessary
	pass

if "GAVO_OOTTEST" in os.environ:
	from gavo import base

else:
	TEST_BASE = os.getcwd()
	originalEnvironment = os.environ.copy()
	os.environ["GAVOCUSTOM"] = "/invalid"
	os.environ["GAVOSETTINGS"] = os.path.join(TEST_BASE,
		"test_data", "test-gavorc")
	if not os.path.exists(os.environ["GAVOSETTINGS"]):
		warnings.warn("testhelpers imported from non-test directory.  This"
			" is almost certainly not what you want (or set GAVO_OOTTEST).")

	os.environ["GAVO_INIT_RUNNING"] = "1"
	from gavo import base #noflake: import above is conditional

	if os.path.exists(base.getConfig("rootDir")):
		# we're assuming the test environment is complete then
		# (i.e., including the test database)
		from gavo.base import sqlsupport
		sqlsupport.initPsycopg()

	else:
		from gavo.user import initdachs
		dbname = "dachstest"
		dsn = initdachs.DSN(dbname)
		initdachs.createFSHierarchy(dsn, "test")

		with open(os.path.join(
				base.getConfig("configDir"), "defaultmeta.txt"), "a",
				encoding="utf-8") as f:
			f.write("!organization.description: Mein wüster Club\n")
			f.write("!contact.email: invalid@wherever.else\n")

		with open(os.path.join(
				base.getConfig("configDir"), "vanitynames.txt"), "w") as f:
			f.write('http://sofo-hd.de sofo !redirect\n'
				'data/cores/scs/scs.xml fromvanitynames\n')

		from gavo.base import config, meta
		meta.makeFallbackMeta(reload=True)
		os.symlink(os.path.join(TEST_BASE, "test_data"),
			os.path.join(base.getConfig("inputsDir"), "data"))
		os.rmdir(os.path.join(base.getConfig("inputsDir"), "__system"))
		os.symlink(os.path.join(TEST_BASE, "test_data", "__system"),
			os.path.join(base.getConfig("inputsDir"), "__system"))
		os.mkdir(os.path.join(base.getConfig("inputsDir"), "test"))

[docs] def ensureResources(): #noflake: deliberate override # delay everything that needs DB connectivity until # after the import; otherwise, we may create a conn pool # and that, as used in DaCHS, may create a thread. That means # risking a deadlock while python is importing. try: subprocess.check_call(["createdb", "--template=template0", "--encoding=UTF-8", "--locale=C", dbname]) initdachs.initDB(dsn) from gavo.base import sqlsupport # update the type bindings importlib.reload(sqlsupport) from gavo.base import sqlsupport sqlsupport.initPsycopg() # update the UDFs importlib.reload(ufunctions) from gavo.registry import publication from gavo import rsc from gavo import rscdesc #noflake: caches registration from gavo.protocols import creds publication.updateServiceList([base.caches.getRD("//rds")]) publication.updateServiceList([base.caches.getRD("//services")]) publication.updateServiceList([base.caches.getRD("//tap")]) # Import some resources necessary in trial tests from gavo.user import importing importing.process(rsc.getParseOptions(), ["//obscore"]) with base.getWritableAdminConn() as conn: rsc.makeData(base.resolveCrossId("//uws#enable_useruws"), connection=conn) creds.addUser(conn, "testing", "testing", "powerless test user") creds.addUser(conn, "other", "other", "another test user") except: traceback.print_exc() sys.stderr.write("Creation of test environment failed. Remove %s\n" " before trying again.\n"%(base.getConfig("rootDir"))) sys.exit(1)
# we really don't want to send mail from the test suite, so override # osinter.sendMail with a no-op from gavo.base import osinter osinter.sendMail = lambda *args, **kwargs: None from gavo import utils from gavo.base import config #noflake: previous import conditional from gavo.base import sqlsupport from gavo.adql import ufunctions
[docs]class FakeSimbad(object): """we monkeypatch simbadinterface such that we don't query simbad during tests. Also, we don't persist cached Simbad responses. It's a bit sad that that functionality therefore doesn't get exercised. """ simbadData = {'Aldebaran': {'RA': 68.98016279, 'dec': 16.50930235, 'oname': 'Aldebaran', 'otype': 'LP?'}, 'M1': {'RA': 83.63308333, 'dec': 22.0145, 'oname': 'M1', 'otype': 'SNR'}, 'Wozzlfoo7xx': None} def __init__(self, *args, **kwargs): pass
[docs] def query(self, ident): return self.simbadData.get(ident)
# Here's the deal on TestResource: When setting up complicated stuff for # tests (like, a DB table), define a TestResource for it. Override # the make(dependents) method returning something and the clean(res) # method to destroy whatever you created in make(). # # Then, in VerboseTests, have a class attribute # resource = [(name1, res1), (name2, res2)] # giving attribute names and resource *instances*. # There's an example in adqltest.py # # If you use this and you have a setUp of your own, you *must* call # the superclass's setUp method. import testresources
[docs]class ResourceInstance(object): """A helper class for TestResource. See that docstring for info on what this is about; in case you encounter one of these but need the real thing, just pull .original. """ def __init__(self, original): self.original = original def __getattr__(self, name): return getattr(self.original, name) def __getitem__(self, index): return self.original[index] def __iter__(self): return iter(self.original) def __str__(self): return str(self.original) def __len__(self): return len(self.original) def __contains__(self, other): return other in self.original
[docs]class TestResource(testresources.TestResource): """A wrapper for testresources maintaining some backward compatibility. testresources 2.0.1 pukes into the namespaces of what's returned from make. I've not really researched what they intend people to return from make these days, but in order to avoid major surgery on the code, this class simply wraps objects that don't allow arbitrary attributes with ResourceInstance when returned from make. To make that happen, I need to override code from testresources itself, which is a pity. In case this breaks: Take all methods that call .make and replace make with _make_and_wrap. Caution: when you implement reset(), you'll have to wrap the result with testhelpers.ResourceInstance manually; but then you'd have to copy dependencies manually, which is crazy, and so I think manual reset currently really is broken. """ def _make_and_wrap(self, deps): res = self.make(deps) try: # see if we can set attributes and return if so... res.improbably_named_attribute = None del res.improbably_named_attribute return res except AttributeError: # ...else wrap the result return ResourceInstance(res) ##### Methods adapted from testresources def _make_all(self, result): """Make the dependencies of this resource and this resource.""" self._call_result_method_if_exists(result, "startMakeResource", self) dependency_resources = {} for name, resource in self.resources: dependency_resources[name] = resource.getResource() resource = self._make_and_wrap(dependency_resources) for name, value in list(dependency_resources.items()): setattr(resource, name, value) self._call_result_method_if_exists(result, "stopMakeResource", self) return resource def _reset(self, resource, dependency_resources): """Override this to reset resources other than via clean+make. This method should reset the self._dirty flag (assuming the manager can ever be clean) and return either the old resource cleaned or a fresh one. :param resource: The resource to reset. :param dependency_resources: A dict mapping name -> resource instance for the resources specified as dependencies. """ self.clean(resource) return self._make_and_wrap(dependency_resources)
from gavo.helpers.testtricks import ( #noflake: exported names XSDTestMixin, testFile, getMemDiffer, getXMLTree, collectedEvents)
[docs]class ForkingSubprocess(subprocess.Popen): """A subprocess that doesn't exec but fork. """ def _execute_child(self, args, executable, preexec_fn, close_fds, pass_fds, cwd, env, startupinfo, creationflags, shell, p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite, restore_signals, # post-3.7, some additional parameters were added, which # fortunately we don't need. *ignored_args): # stolen largely from 2.7 subprocess. Unfortunately, I can't just override the # exec action; so, I'm replacing the the whole exec shebang with a simple # call to executable(). # # Also, I'm doing some extra hack to collect coverage info if it looks # like we're collecting coverage. # # The signature (and a few inside parts) is from 3.7 subprocess. We're # ignoring everything that 2.7 hasn't known under the bold assumption that our # simple testing forks that doesn't matter (which ought to be true for # start_new_session at least:-). sys.argv = args if executable is None: executable = args[0] def _close_in_parent(fd): os.close(fd) # For transferring possible exec failure from child to parent. # Data format: "exception name:hex errno:description" # Pickle is not used; it is complex and involves memory allocation. errpipe_read, errpipe_write = os.pipe() try: gc_was_enabled = gc.isenabled() # Disable gc to avoid bug where gc -> file_dealloc -> # write to stderr -> hang. http://bugs.python.org/issue1336 gc.disable() try: self.pid = os.fork() except: if gc_was_enabled: gc.enable() raise self._child_created = True if self.pid == 0: # Child try: # Close parent's pipe ends if p2cwrite!=-1: os.close(p2cwrite) if c2pread!=-1: os.close(c2pread) if errread!=-1: os.close(errread) os.close(errpipe_read) # When duping fds, if there arises a situation # where one of the fds is either 0, 1 or 2, it # is possible that it is overwritten (#12607). if c2pwrite == 0: c2pwrite = os.dup(c2pwrite) if errwrite == 0 or errwrite == 1: errwrite = os.dup(errwrite) # Dup fds for child def _dup2(a, b): # dup2() removes the CLOEXEC flag but # we must do it ourselves if dup2() # would be a no-op (issue #10806). if a == b: self._set_cloexec_flag(a, False) elif a!=-1: os.dup2(a, b) _dup2(p2cread, 0) _dup2(c2pwrite, 1) _dup2(errwrite, 2) # Close pipe fds. Make sure we don't close the # same fd more than once, or standard fds. closed = set([-1]) for fd in [p2cread, c2pwrite, errwrite]: if fd not in closed and fd > 2: os.close(fd) closed.add(fd) if cwd is not None: os.chdir(cwd) if "COVERAGE_FILE" in os.environ: cov = coverage.Coverage(data_file="forked.cov", source=["gavo"], auto_data=True) cov.config.disable_warnings = ["module-not-measured"] cov.start() if preexec_fn: preexec_fn() exitcode = 0 except: exc_type, exc_value, tb = sys.exc_info() # Save the traceback and attach it to the exception object exc_lines = traceback.format_exception(exc_type, exc_value, tb) exc_value.child_traceback = ''.join(exc_lines) os.write(errpipe_write, pickle.dumps(exc_value)) os.close(errpipe_write) os._exit(255) os.close(errpipe_write) try: executable() except SystemExit as ex: exitcode = ex.code if "COVERAGE_FILE" in os.environ: cov.stop() cov.save() sys.stderr.close() sys.stdout.close() os._exit(exitcode) # Parent os.close(errpipe_write) if gc_was_enabled: gc.enable() # Now wait for the child to come up (at which point it will # close its error pipe) errpipe_data = bytearray() while True: part = os.read(errpipe_read, 50000) errpipe_data += part if not part or len(errpipe_data)>50000: break if errpipe_data: child_exception = pickle.loads(errpipe_data) raise child_exception finally: # close the FDs used by the child if they were opened if p2cread!=-1 and p2cwrite!=-1: _close_in_parent(p2cread) if c2pwrite!=-1 and c2pread!=-1: _close_in_parent(c2pwrite) if errwrite!=-1 and errread!=-1: _close_in_parent(errwrite) # be sure the error pipe FD is eventually no matter what os.close(errpipe_read)
[docs]class VerboseTest(testresources.ResourcedTestCase): """A TestCase with a couple of convenient assert methods. """
[docs] def assertEqualForArgs(self, callable, result, *args): self.assertEqual(callable(*args), result, "Failed for arguments %s. Expected result is: %s, result found" " was: %s"%(str(args), repr(result), repr(callable(*args))))
[docs] def assertRaisesVerbose(self, exception, callable, args, msg): try: callable(*args) except exception: return except: raise else: raise self.failureException(msg)
[docs] def assertRaisesWithMsg(self, exception, errMsg, callable, args, msg=None, **kwargs): try: value = callable(*args, **kwargs) except exception as ex: if errMsg==str(ex): pass # make sure we decide on __eq__ for EqualingRE's sake else: raise self.failureException( "Expected %r, got %r as exception message"%(errMsg, str(ex))) return ex except: raise else: raise self.failureException(msg or "%s not raised (function returned %s)"%( str(exception), repr(value)))
[docs] def assertRuns(self, callable, args, msg=None): try: callable(*args) except Exception as ex: raise self.failureException("Should run, but raises %s (%s) exception"%( ex.__class__.__name__, str(ex)))
[docs] def assertAlmostEqualVector(self, first, second, places=7, msg=None): try: for f, s in zip(first, second): self.assertAlmostEqual(f, s, places) except AssertionError: if msg: raise AssertionError(msg) else: raise AssertionError("%s != %s within %d places"%( first, second, places))
[docs] def assertEqualToWithin(self, a, b, ratio=1e-7, msg=None): """asserts that abs(a-b/(a+b))<ratio. If a+b are an underflow, we error out right now. """ if msg is None: msg = "%s != %s to within %s of the sum"%(a, b, ratio) denom = abs(a+b) self.assertTrue(abs(a-b)/denom<ratio, msg)
[docs] def assertOutput(self, toExec, argList, expectedStdout=None, expectedStderr="", expectedRetcode=0, input=None, stdoutStrings=None): """checks that execName called with argList has the given output and return value. expectedStdout and expectedStderr can be functions. In that case, the output is passed to the function, and an assertionError is raised if the functions do not return true. The 0th argument in argList is automatically added, only pass "real" command line arguments. toExec may also be a zero-argument python function. In that case, the process is forked and the function is called, with sys.argv according to argList. This helps to save startup time for python main functions. """ for name in ["output.stderr", "output.stdout"]: try: os.unlink(name) except os.error: pass if isinstance(toExec, str): p = subprocess.Popen([toExec]+argList, executable=toExec, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) else: p = ForkingSubprocess(["test harness"]+argList, executable=toExec, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate(input=utils.bytify(input)) retcode = p.wait() try: if isinstance(expectedStderr, (bytes, str)): self.assertEqual(err, utils.bytify(expectedStderr)) elif isinstance(expectedStderr, list): for sample in expectedStderr: self.assertTrue(utils.bytify(sample) in err, "%s missing in stderr"%repr(sample)) else: self.assertTrue(expectedStderr(err), "Stderr didn't match functional" " expectation: %s"%err.decode("ascii", "?")) self.assertEqual(expectedRetcode, retcode) except AssertionError: with open("output.stdout", "wb") as f: f.write(out) with open("output.stderr", "wb") as f: f.write(err) raise try: if isinstance(expectedStdout, (bytes, str)): self.assertEqual(out, utils.bytify(expectedStdout)) elif isinstance(expectedStdout, list): for sample in expectedStdout: self.assertTrue(utils.bytify(sample) in out, "%s missing in stdout"%repr(sample)) elif expectedStdout is not None: self.assertTrue(expectedStdout(out)) except AssertionError: with open("output.stdout", "wb") as f: f.write(out) raise
[docs] def assertEqualIgnoringAliases(self, result, expectation): pat = re.escape(re.sub(r"\s+", " ", expectation) ).replace("ASWHATEVER", "AS [a-z]+")+"$" if not re.match(pat, re.sub(r"\s+", " ", result)): raise AssertionError("%r != %r"%(result, expectation))
_xmlJunkPat = re.compile("|".join([ '(xmlns(:[a-z0-9]+)?="[^"]*"\s*)', '((frame_|coord_system_)?id="[^"]*")', '(xsi:schemaLocation="[^"]*"\s*)']))
[docs]def cleanXML(aString): """removes IDs and some other detritus from XML literals. The result will be invalid XML, and all this assumes the fixed-prefix logic of the DC software. For new tests, you should just getXMLTree and XPath tests. aString can be bytes, in which case it will be interpreted as ASCII. cleanXML returns a string in any case. """ return re.sub("\s+", " ", _xmlJunkPat.sub('', utils.debytify(aString))).strip( ).replace(" />", "/>").replace(" >", ">")
[docs]class SamplesBasedAutoTest(type): """A metaclass that builds tests out of a samples attribute of a class. To use this, give the class a samples attribute containing a sequence of anything, and a _runTest(sample) method receiving one item of that sequence. The metaclass will create one test<n> method for each sample. """ def __new__(cls, name, bases, dict): for sampInd, sample in enumerate(dict.get("samples", ())): def testFun(self, sample=sample): self._runTest(sample) dict["test%02d"%sampInd] = testFun return type.__new__(cls, name, bases, dict)
[docs]class SimpleSampleComparisonTest(VerboseTest, metaclass=SamplesBasedAutoTest): """A base class for tests that simply run a function and compare for equality. The function to be called is in the functionToRun attribute (wrap it in a staticmethod). The samples are pairs of (input, output). Output may be an exception (or just the serialised form of the exception). """ def _runTest(self, sample): val, expected = sample try: self.assertEqual(self.functionToRun(val), expected) except AssertionError: raise except Exception as ex: if str(ex)!=str(expected): raise
[docs]def computeWCSKeys(pos, size, cutCrap=False): """returns a dictionary containing a 2D WCS structure for an image centered at pos with angular size. Both are 2-tuples in degrees. """ imgPix = (1000., 1000.) res = { "CRVAL1": pos[0], "CRVAL2": pos[1], "CRPIX1": imgPix[0]/2., "CRPIX2": imgPix[1]/2., "CUNIT1": "deg", "CUNIT2": "deg", "CD1_1": size[0]/imgPix[0], "CD1_2": 0, "CD2_2": size[1]/imgPix[1], "CD2_1": 0, "NAXIS1": imgPix[0], "NAXIS2": imgPix[1], "NAXIS": 2, "CTYPE1": 'RA---TAN-SIP', "CTYPE2": 'DEC--TAN-SIP', "LONPOLE": 180.} if not cutCrap: res.update({"imageTitle": "test image at %s"%repr(pos), "instId": None, "dateObs":55300+pos[0], "refFrame": None, "wcs_equinox": None, "bandpassId": None, "bandpassUnit": None, "bandpassRefval": None, "bandpassLo": pos[0]+0.0001, "bandpassHi": pos[0]+size[0], "pixflags": None, "accref": "image/%s/%s"%(pos, size), "accsize": (30+int(pos[0]+pos[1]+size[0]+size[1]))*1024, "embargo": None, "owner": None, }) return res
[docs]class StandIn(object): """A class having the attributes passed as kwargs to the constructor. """ def __init__(self, **kwargs): for key, value in list(kwargs.items()): setattr(self, key, value)
[docs]def getTestRD(id="test.rd"): from gavo import rscdesc #noflake: import above is conditional from gavo import base return base.caches.getRD("data/%s"%id)
[docs]def getTestTable(tableName, id="test.rd"): return getTestRD(id).getTableDefById(tableName)
[docs]def getTestData(dataId): return getTestRD().getDataById(dataId)
[docs]def hasUDF(udfName): """returns true if this system has an ADQL user defined function named udfName. This is intended to be used as in:: @unittest.skipUnless(testhelpers.hasUDF("GAVO_FOO"), "...") Caution: UDF names are upper case here. """ return udfName in ufunctions.UFUNC_REGISTRY
[docs]def captureOutput(callable, args=(), kwargs={}): """runs ``callable(*args, **kwargs)`` and captures the output. The function returns a tuple of return value, stdout output, stderr output. """ realOut, realErr = sys.stdout, sys.stderr sys.stdout, sys.stderr = io.StringIO(), io.StringIO() try: retVal = 2 # in case the callable sys.exits try: retVal = callable(*args, **kwargs) except SystemExit: # don't terminate just because someone thinks it's a good idea pass finally: outCont, errCont = sys.stdout.getvalue(), sys.stderr.getvalue() sys.stdout, sys.stderr = realOut, realErr return retVal, outCont, errCont
[docs]class CatchallUI(object): """A replacement for base.ui, collecting the messages being sent. This is to write tests against producing UI events. Use it with the messageCollector context manager below. """ def __init__(self): self.events = []
[docs] def record(self, evType, args, kwargs): self.events.append((evType, args, kwargs))
def __getattr__(self, attName): if attName.startswith("notify"): return lambda *args, **kwargs: self.record(attName[6:], args, kwargs)
[docs]@contextlib.contextmanager def messageCollector(): """A context manager recording UI events. The object returned by the context manager is a CatchallUI; get the events accumulated during the run time in its events attribute. """ tempui = CatchallUI() realui = base.ui try: base.ui = tempui yield tempui finally: base.ui = realui
[docs]@contextlib.contextmanager def fakeTime(t): """makes time.time() return t within the controlled block. """ time_time = time.time try: time.time = lambda *args: t yield finally: time.time = time_time
[docs]def getServerInThread(data, onlyOnce=False, contentType="text/plain"): """runs a server in a thread and returns thread and base url. onlyOnce will configure the server such that it destroys itself after having handled one request. The thread would still need to be joined. So, better use the DataServer context manager. """ if isinstance(data, str): data = data.encode("utf-8") class Handler(http.server.BaseHTTPRequestHandler): def log_request(self, *args): pass def do_404(self): self.send_response(404, "Not Found") self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(b"You know this wouldn't be here") def do_GET(self): if self.path.endswith("does-not-exist"): return self.do_404() self.send_response(200, "Ok") self.send_header("Content-Type", contentType) self.end_headers() self.wfile.write(data) do_POST = do_GET port = 34000 httpd = http.server.HTTPServer(('', port), Handler) if onlyOnce: serve = httpd.handle_request else: serve = httpd.serve_forever t = threading.Thread(target=serve) t.setDaemon(True) t.start() return httpd, t, "http://localhost:%s"%port
[docs]@contextlib.contextmanager def DataServer(data): """a context manager for briefly running a web server returning data. This yields the base URL the server is listening on. """ httpd, t, baseURL = getServerInThread(data) try: yield baseURL finally: httpd.shutdown() t.join(10)
[docs]@contextlib.contextmanager def userconfigContent(content): """a context manager to temporarily set some content to userconfig. This cleans up after itself and clears any userconfig cache before it sets to work. content are RD elements without the root (resource) tag. """ userConfigPath = os.path.join( base.getConfig("configDir"), "userconfig.rd") base.caches.clearForName(userConfigPath[:-3]) with open(userConfigPath, "w") as f: f.write('<resource schema="__system">\n' +content +'\n</resource>\n') try: yield finally: os.unlink(userConfigPath) base.caches.clearForName(userConfigPath[:-3])
[docs]@contextlib.contextmanager def tempConfig(*items): """sets (sect, key, value) pairs in the config temporarily and re-sets the original values after the controlled block. """ origValues = [] for sect, key, val in items: origValues.append((sect, key, config.get(sect, key))) config.set(sect, key, val) try: yield finally: for sect, key, val in origValues: config.getitem(sect, key).value = val
[docs]@contextlib.contextmanager def forceConnection(conn): """makes the sqlsupport connection context managers return conn in the controlled block. It is up to the programmer to make sure that actually makes sense in a given test. As opposed to the originals, there will be no automatic commits here. Caveat emptor. """ symbols = ["getUntrustedConn", "getTableConn", "getAdminConn", "getWritableUntrustedConn", "getWritableTableConn", "getWritableAdminConn"] def _(): yield conn cm = contextlib.contextmanager(_) originals = [] for sym in symbols: originals.append(getattr(sqlsupport, sym)) setattr(sqlsupport, sym, cm) try: yield finally: originals.reverse() for sym in symbols: originals.append(getattr(sqlsupport, sym)) setattr(sqlsupport, sym, originals.pop())
[docs]def main(testClass, methodPrefix=None): ensureResources() if os.environ.get("GAVO_LOG")!="no": base.DEBUG = True from gavo.user import logui logui.LoggingUI(base.ui) if "GAVO_OOTTEST" not in os.environ: # run any pending upgrades (that's a test for them, too... of sorts) from gavo.user import upgrade upgrade.upgrade() className, methodPrefix = None, None try: # two args: first one is class name, locate it in caller's globals # and ignore anything before any dot for cut'n'paste convenience if len(sys.argv)==3: className = sys.argv[1].split(".")[-1] methodPrefix = sys.argv[2] # one arg: dissect dots and decide what to do then elif len(sys.argv)==2: parts = sys.argv[1].split(".") if len(parts)==3: className, methodPrefix = parts[1:] elif len(parts)==2: className, methodPrefix = parts elif len(parts)==1: className = parts[0] else: sys.exit(f"{className}?") elif len(sys.argv)==1: pass else: sys.exit("usage: test-id or class-name test-prefix") if className is None: suite = testresources.TestLoader().loadTestsFromModule( sys.modules["__main__"]) else: testClass = getattr(sys.modules["__main__"], className) suite = unittest.makeSuite(testClass, methodPrefix or "test", suiteClass=testresources.OptimisingTestSuite) runner = unittest.TextTestRunner( verbosity=int(os.environ.get("TEST_VERBOSITY", 1))) result = runner.run(suite) if result.errors or result.failures: sys.exit(1) else: sys.exit(0) except (SystemExit, KeyboardInterrupt): raise except: base.showHints = True from gavo.user import errhandle traceback.print_exc() errhandle.raiseAndCatch(base) sys.exit(1)