Package gavo :: Package helpers :: Module testhelpers
[frames] | no frames]

Source Code for Module gavo.helpers.testhelpers

  1  """ 
  2  Helper classes for the DaCHS' unit tests. 
  3   
  4  WARNING: This messes up some global state.  DO NOT import into modules 
  5  doing regular work.  testtricks is the module for that kind for stuff. 
  6  """ 
  7   
  8  #c Copyright 2008-2019, the GAVO project 
  9  #c 
 10  #c This program is free software, covered by the GNU GPL.  See the 
 11  #c COPYING file in the source distribution. 
 12   
 13   
 14  import BaseHTTPServer 
 15  import contextlib 
 16  import gc 
 17  import os 
 18  import pickle 
 19  import re 
 20  import subprocess 
 21  import sys 
 22  import threading 
 23  import traceback 
 24  import unittest 
 25  import warnings 
 26  from cStringIO import StringIO 
 27   
 28  from nevow import inevow 
 29  from nevow.testutil import FakeRequest 
 30  from twisted.python.components import registerAdapter 
31 32 # This sets up a test environment of the DaCHS software. 33 # 34 # To make this work, the current user must be allowed to run 35 # createdb (in practice, you should have done something like 36 # 37 # sudo -u postgres -s `id -nu` 38 # 39 # You should be able to tear both ~/_gavo_test and the database 40 # down, and this should automatically recreate everything. That's 41 # an integration test for DaCHS, too. 42 # 43 # This must be run before anything else from gavo is imported because 44 # it manipulates the config stuff; this, in turn, runs as soon as 45 # base is imported. 46 47 # The following forces tests to be run from the tests directory. 48 # Reasonable, I'd say. 49 # 50 # All the custom setup can be suppressed by setting a GAVO_OOTTEST 51 # env var before importing this. That's for "out of tree test" 52 # and is used by the relational registry "unit" tests (and possibly 53 # others later). 54 55 -def ensureResources():
56 # overridden below if necessary 57 pass
58 59 if "GAVO_OOTTEST" in os.environ: 60 from gavo import base 61 62 else: 63 TEST_BASE = os.getcwd() 64 originalEnvironment = os.environ.copy() 65 os.environ["GAVOCUSTOM"] = "/invalid" 66 os.environ["GAVOSETTINGS"] = os.path.join(TEST_BASE, 67 "test_data", "test-gavorc") 68 if not os.path.exists(os.environ["GAVOSETTINGS"]): 69 warnings.warn("testhelpers imported from non-test directory. This" 70 " is almost certainly not what you want (or set GAVO_OOTTEST).") 71 72 from gavo import base #noflake: import above is conditional 73 74 if not os.path.exists(base.getConfig("rootDir")): 75 from gavo.user import initdachs 76 dbname = "dachstest" 77 dsn = initdachs.DSN(dbname) 78 initdachs.createFSHierarchy(dsn, "test") 79 with open(os.path.join( 80 base.getConfig("configDir"), "defaultmeta.txt"), "a") as f: 81 f.write("!organization.description: Mein w\xc3\xbcster Club\n") 82 f.write("!contact.email: invalid@whereever.else\n") 83 from gavo.base import config 84 config.makeFallbackMeta(reload=True) 85 os.symlink(os.path.join(TEST_BASE, "test_data"), 86 os.path.join(base.getConfig("inputsDir"), "data")) 87 os.rmdir(os.path.join(base.getConfig("inputsDir"), "__system")) 88 os.symlink(os.path.join(TEST_BASE, "test_data", "__system"), 89 os.path.join(base.getConfig("inputsDir"), "__system")) 90 os.mkdir(os.path.join(base.getConfig("inputsDir"), "test"))
91 92 - def ensureResources(): #noflake: deliberate override
93 # delay everything that needs DB connectivity until 94 # after the import; otherwise, we may create a conn pool 95 # and that, as used in DaCHS, may create a thread. That means 96 # risking a deadlock while python is importing. 97 try: 98 subprocess.check_call(["createdb", "--template=template0", 99 "--encoding=UTF-8", "--locale=C", dbname]) 100 101 initdachs.initDB(dsn) 102 103 from gavo.registry import publication 104 from gavo import rsc 105 from gavo import rscdesc #noflake: caches registration 106 from gavo.protocols import tap 107 publication.updateServiceList([base.caches.getRD("//services")]) 108 publication.updateServiceList([base.caches.getRD("//tap")]) 109 110 # Import some resources necessary in trial tests 111 with base.getWritableAdminConn() as conn: 112 rsc.makeData(base.resolveCrossId("//obscore#makeSources"), 113 connection=conn) 114 rsc.makeData(base.resolveCrossId("//obscore#create"), 115 connection=conn) 116 tap.publishToTAP(base.resolveCrossId("//obscore"), 117 conn) 118 rsc.makeData(base.resolveCrossId("//uws#enable_useruws"), 119 connection=conn) 120 except: 121 traceback.print_exc() 122 sys.stderr.write("Creation of test environment failed. Remove %s\n" 123 " before trying again.\n"%(base.getConfig("rootDir"))) 124 sys.exit(1) 125 126 127 # we really don't want to send mail from the test suite, so override 128 # osinter.sendMail with a no-op 129 from gavo.base import osinter 130 osinter.sendMail = lambda *args, **kwargs: None
131 132 133 -class FakeSimbad(object):
134 """we monkeypatch simbadinterface such that we don't query simbad during 135 tests. 136 137 Also, we don't persist cached Simbad responses. It's a bit sad that 138 that functionality therefore doesn't get exercised. 139 """ 140 simbadData = {'Aldebaran': {'RA': 68.98016279, 141 'dec': 16.50930235, 142 'oname': 'Aldebaran', 143 'otype': 'LP?'}, 144 u'M1': {'RA': 83.63308333, 'dec': 22.0145, 'oname': 'M1', 'otype': 'SNR'}, 145 'Wozzlfoo7xx': None} 146
147 - def __init__(self, *args, **kwargs):
148 pass
149
150 - def query(self, ident):
151 return self.simbadData.get(ident)
152 153 154 # Here's the deal on TestResource: When setting up complicated stuff for 155 # tests (like, a DB table), define a TestResource for it. Override 156 # the make(dependents) method returning something and the clean(res) 157 # method to destroy whatever you created in make(). 158 # 159 # Then, in VerboseTests, have a class attribute 160 # resource = [(name1, res1), (name2, res2)] 161 # giving attribute names and resource *instances*. 162 # There's an example in adqltest.py 163 # 164 # If you use this and you have a setUp of your own, you *must* call 165 # the superclass's setUp method. 166 167 import testresources
168 169 -class ResourceInstance(object):
170 """A helper class for TestResource. 171 172 See that docstring for info on what this is about; in case you 173 encounter one of these but need the real thing, just pull 174 .original. 175 """
176 - def __init__(self, original):
177 self.original = original
178
179 - def __getattr__(self, name):
180 return getattr(self.original, name)
181
182 - def __getitem__(self, index):
183 return self.original[index]
184
185 - def __iter__(self):
186 return iter(self.original)
187
188 - def __str__(self):
189 return str(self.original)
190
191 - def __unicode__(self):
192 return unicode(self.original)
193
194 - def __len__(self):
195 return len(self.original)
196
197 - def __contains__(self, other):
198 return other in self.original
199
200 201 -class TestResource(testresources.TestResource):
202 """A wrapper for testresources maintaining some backward compatibility. 203 204 testresources 2.0.1 pukes into the namespaces of what's 205 returned from make. I've not really researched what they 206 intend people to return from make these days, but in order 207 to avoid major surgery on the code, this class simply wraps 208 objects that don't allow arbitrary attributes with ResourceInstance 209 when returned from make. 210 211 To make that happen, I need to override code from testresources itself, 212 which is a pity. In case this breaks: Take all methods that call .make 213 and replace make with _make_and_wrap. 214 215 Caution: when you implement reset(), you'll have to wrap the 216 result with testhelpers.ResourceInstance manually; but then you'd 217 have to copy dependencies manually, which is crazy, and so I think 218 manual reset currently really is broken. 219 """
220 - def _make_and_wrap(self, deps):
221 res = self.make(deps) 222 try: 223 # see if we can set attributes and return if so... 224 res.improbably_named_attribute = None 225 del res.improbably_named_attribute 226 return res 227 except AttributeError: 228 # ...else wrap the result 229 return ResourceInstance(res)
230 231 ##### Methods adapted from testresources 232
233 - def _make_all(self, result):
234 """Make the dependencies of this resource and this resource.""" 235 self._call_result_method_if_exists(result, "startMakeResource", self) 236 dependency_resources = {} 237 for name, resource in self.resources: 238 dependency_resources[name] = resource.getResource() 239 resource = self._make_and_wrap(dependency_resources) 240 for name, value in dependency_resources.items(): 241 setattr(resource, name, value) 242 self._call_result_method_if_exists(result, "stopMakeResource", self) 243 return resource
244
245 - def _reset(self, resource, dependency_resources):
246 """Override this to reset resources other than via clean+make. 247 248 This method should reset the self._dirty flag (assuming the manager can 249 ever be clean) and return either the old resource cleaned or a fresh 250 one. 251 252 :param resource: The resource to reset. 253 :param dependency_resources: A dict mapping name -> resource instance 254 for the resources specified as dependencies. 255 """ 256 self.clean(resource) 257 return self._make_and_wrap(dependency_resources)
258 259 260 from gavo.helpers.testtricks import ( #noflake: exported names 261 XSDTestMixin, testFile, getMemDiffer, getXMLTree, collectedEvents)
262 263 264 265 -class ForkingSubprocess(subprocess.Popen):
266 """A subprocess that doesn't exec but fork. 267 """
268 - def _execute_child(self, args, executable, preexec_fn, close_fds, 269 cwd, env, universal_newlines, 270 startupinfo, creationflags, shell, to_close, 271 p2cread, p2cwrite, 272 c2pread, c2pwrite, 273 errread, errwrite):
274 # stolen from 2.7 subprocess. Unfortunately, I can't just override the 275 # exec action. 276 277 sys.argv = args 278 if executable is None: 279 executable = args[0] 280 281 def _close_in_parent(fd): 282 os.close(fd) 283 to_close.remove(fd)
284 285 # For transferring possible exec failure from child to parent 286 # The first char specifies the exception type: 0 means 287 # OSError, 1 means some other error. 288 errpipe_read, errpipe_write = self.pipe_cloexec() 289 try: 290 try: 291 gc_was_enabled = gc.isenabled() 292 # Disable gc to avoid bug where gc -> file_dealloc -> 293 # write to stderr -> hang. http://bugs.python.org/issue1336 294 gc.disable() 295 try: 296 self.pid = os.fork() 297 except: 298 if gc_was_enabled: 299 gc.enable() 300 raise 301 self._child_created = True 302 if self.pid == 0: 303 # Child 304 try: 305 # Close parent's pipe ends 306 if p2cwrite is not None: 307 os.close(p2cwrite) 308 if c2pread is not None: 309 os.close(c2pread) 310 if errread is not None: 311 os.close(errread) 312 os.close(errpipe_read) 313 314 # When duping fds, if there arises a situation 315 # where one of the fds is either 0, 1 or 2, it 316 # is possible that it is overwritten (#12607). 317 if c2pwrite == 0: 318 c2pwrite = os.dup(c2pwrite) 319 if errwrite == 0 or errwrite == 1: 320 errwrite = os.dup(errwrite) 321 322 # Dup fds for child 323 def _dup2(a, b): 324 # dup2() removes the CLOEXEC flag but 325 # we must do it ourselves if dup2() 326 # would be a no-op (issue #10806). 327 if a == b: 328 self._set_cloexec_flag(a, False) 329 elif a is not None: 330 os.dup2(a, b)
331 _dup2(p2cread, 0) 332 _dup2(c2pwrite, 1) 333 _dup2(errwrite, 2) 334 335 # Close pipe fds. Make sure we don't close the 336 # same fd more than once, or standard fds. 337 closed = set([None]) 338 for fd in [p2cread, c2pwrite, errwrite]: 339 if fd not in closed and fd > 2: 340 os.close(fd) 341 closed.add(fd) 342 343 if cwd is not None: 344 os.chdir(cwd) 345 346 if preexec_fn: 347 preexec_fn() 348 349 # Close all other fds, if asked for - after 350 # preexec_fn(), which may open FDs. 351 if close_fds: 352 self._close_fds(but=errpipe_write) 353 354 exitcode = 0 355 try: 356 executable() 357 except SystemExit as ex: 358 exitcode = ex.code 359 360 sys.stderr.close() 361 sys.stdout.close() 362 os._exit(exitcode) 363 364 except: 365 exc_type, exc_value, tb = sys.exc_info() 366 # Save the traceback and attach it to the exception object 367 exc_lines = traceback.format_exception(exc_type, 368 exc_value, 369 tb) 370 exc_value.child_traceback = ''.join(exc_lines) 371 os.write(errpipe_write, pickle.dumps(exc_value)) 372 373 os._exit(255) 374 375 # Parent 376 if gc_was_enabled: 377 gc.enable() 378 finally: 379 # be sure the FD is closed no matter what 380 os.close(errpipe_write) 381 382 finally: 383 if p2cread is not None and p2cwrite is not None: 384 _close_in_parent(p2cread) 385 if c2pwrite is not None and c2pread is not None: 386 _close_in_parent(c2pwrite) 387 if errwrite is not None and errread is not None: 388 _close_in_parent(errwrite) 389 390 # be sure the FD is closed no matter what 391 os.close(errpipe_read) 392
393 394 -class VerboseTest(testresources.ResourcedTestCase):
395 """A TestCase with a couple of convenient assert methods. 396 """
397 - def assertEqualForArgs(self, callable, result, *args):
398 self.assertEqual(callable(*args), result, 399 "Failed for arguments %s. Expected result is: %s, result found" 400 " was: %s"%(str(args), repr(result), repr(callable(*args))))
401
402 - def assertRaisesVerbose(self, exception, callable, args, msg):
403 try: 404 callable(*args) 405 except exception: 406 return 407 except: 408 raise 409 else: 410 raise self.failureException(msg)
411
412 - def assertRaisesWithMsg(self, exception, errMsg, callable, args, msg=None, 413 **kwargs):
414 try: 415 value = callable(*args, **kwargs) 416 except exception as ex: 417 if errMsg==str(ex): 418 pass # make sure we decide on __eq__ for EqualingRE's sake 419 else: 420 raise self.failureException( 421 "Expected %r, got %r as exception message"%(errMsg, str(ex))) 422 return ex 423 except: 424 raise 425 else: 426 raise self.failureException(msg or 427 "%s not raised (function returned %s)"%( 428 str(exception), repr(value)))
429
430 - def assertRuns(self, callable, args, msg=None):
431 try: 432 callable(*args) 433 except Exception as ex: 434 raise self.failureException("Should run, but raises %s (%s) exception"%( 435 ex.__class__.__name__, str(ex)))
436
437 - def assertAlmostEqualVector(self, first, second, places=7, msg=None):
438 try: 439 for f, s in zip(first, second): 440 self.assertAlmostEqual(f, s, places) 441 except AssertionError: 442 if msg: 443 raise AssertionError(msg) 444 else: 445 raise AssertionError("%s != %s within %d places"%( 446 first, second, places))
447
448 - def assertEqualToWithin(self, a, b, ratio=1e-7, msg=None):
449 """asserts that abs(a-b/(a+b))<ratio. 450 451 If a+b are an underflow, we error out right now. 452 """ 453 if msg is None: 454 msg = "%s != %s to within %s of the sum"%(a, b, ratio) 455 denom = abs(a+b) 456 self.failUnless(abs(a-b)/denom<ratio, msg)
457
458 - def assertOutput(self, toExec, argList, expectedStdout=None, 459 expectedStderr="", expectedRetcode=0, input=None, 460 stdoutStrings=None):
461 """checks that execName called with argList has the given output and return 462 value. 463 464 expectedStdout and expectedStderr can be functions. In that case, 465 the output is passed to the function, and an assertionError is raised 466 if the functions do not return true. 467 468 The 0th argument in argList is automatically added, only pass "real" 469 command line arguments. 470 471 toExec may also be a zero-argument python function. In that case, the 472 process is forked and the function is called, with sys.argv according to 473 argList. This helps to save startup time for python main functions. 474 """ 475 for name in ["output.stderr", "output.stdout"]: 476 try: 477 os.unlink(name) 478 except os.error: 479 pass 480 481 if isinstance(toExec, basestring): 482 p = subprocess.Popen([toExec]+argList, executable=toExec, 483 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) 484 else: 485 p = ForkingSubprocess(["test harness"]+argList, executable=toExec, 486 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) 487 out, err = p.communicate(input=input) 488 retcode = p.wait() 489 490 try: 491 self.assertEqual(expectedRetcode, retcode) 492 493 if isinstance(expectedStderr, basestring): 494 self.assertEqual(err, expectedStderr) 495 else: 496 self.failUnless(expectedStderr(err)) 497 except AssertionError: 498 with open("output.stdout", "w") as f: 499 f.write(out) 500 with open("output.stderr", "w") as f: 501 f.write(err) 502 raise 503 504 try: 505 if isinstance(expectedStdout, basestring): 506 self.assertEqual(out, expectedStdout) 507 elif expectedStdout is not None: 508 self.failUnless(expectedStdout(out)) 509 if stdoutStrings: 510 for s in stdoutStrings: 511 self.failIf(s not in out, "%s missing"%s) 512 except AssertionError: 513 with open("output.stdout", "w") as f: 514 f.write(out) 515 raise
516
517 - def assertEqualIgnoringAliases(self, result, expectation):
518 pat = re.escape(expectation).replace("ASWHATEVER", "AS [a-z]+")+"$" 519 if not re.match(pat, result): 520 raise AssertionError("%r != %r"%(result, expectation))
521 522 523 _xmlJunkPat = re.compile("|".join([ 524 '(xmlns(:[a-z0-9]+)?="[^"]*"\s*)', 525 '((frame_|coord_system_)?id="[^"]*")', 526 '(xsi:schemaLocation="[^"]*"\s*)']))
527 528 529 -def cleanXML(aString):
530 """removes IDs and some other detritus from XML literals. 531 532 The result will be invalid XML, and all this assumes the fixed-prefix 533 logic of the DC software. 534 535 For new tests, you should just getXMLTree and XPath tests. 536 """ 537 return re.sub("\s+", " ", _xmlJunkPat.sub('', aString)).strip( 538 ).replace(" />", "/>").replace(" >", ">")
539
540 541 -def printFormattedXML(xmlString):
542 """pipes xmlString through xmlstarlet fo, pretty-printing it. 543 """ 544 p = subprocess.Popen("xmlstarlet fo".split(), stdin=subprocess.PIPE) 545 p.stdin.write(xmlString) 546 p.stdin.close() 547 p.wait()
548
549 550 -class SamplesBasedAutoTest(type):
551 """A metaclass that builds tests out of a samples attribute of a class. 552 553 To use this, give the class a samples attribute containing a sequence 554 of anything, and a _runTest(sample) method receiving one item of 555 that sequence. 556 557 The metaclass will create one test<n> method for each sample. 558 """
559 - def __new__(cls, name, bases, dict):
560 for sampInd, sample in enumerate(dict.get("samples", ())): 561 def testFun(self, sample=sample): 562 self._runTest(sample)
563 dict["test%02d"%sampInd] = testFun 564 return type.__new__(cls, name, bases, dict)
565
566 567 -class SimpleSampleComparisonTest(VerboseTest):
568 """A base class for tests that simply run a function and compare 569 for equality. 570 571 The function to be called is in the functionToRun attribute (wrap 572 it in a staticmethod). 573 574 The samples are pairs of (input, output). Output may be an 575 exception (or just the serialised form of the exception). 576 """ 577 __metaclass__ = SamplesBasedAutoTest 578
579 - def _runTest(self, sample):
580 val, expected = sample 581 try: 582 self.assertEqual(self.functionToRun(val), 583 expected) 584 except AssertionError as ex: 585 raise 586 except Exception as ex: 587 if str(ex)!=str(expected): 588 raise
589
590 591 -def computeWCSKeys(pos, size, cutCrap=False):
592 """returns a dictionary containing a 2D WCS structure for an image 593 centered at pos with angular size. Both are 2-tuples in degrees. 594 """ 595 imgPix = (1000., 1000.) 596 res = { 597 "CRVAL1": pos[0], 598 "CRVAL2": pos[1], 599 "CRPIX1": imgPix[0]/2., 600 "CRPIX2": imgPix[1]/2., 601 "CUNIT1": "deg", 602 "CUNIT2": "deg", 603 "CD1_1": size[0]/imgPix[0], 604 "CD1_2": 0, 605 "CD2_2": size[1]/imgPix[1], 606 "CD2_1": 0, 607 "NAXIS1": imgPix[0], 608 "NAXIS2": imgPix[1], 609 "NAXIS": 2, 610 "CTYPE1": 'RA---TAN-SIP', 611 "CTYPE2": 'DEC--TAN-SIP', 612 "LONPOLE": 180.} 613 if not cutCrap: 614 res.update({"imageTitle": "test image at %s"%repr(pos), 615 "instId": None, 616 "dateObs":55300+pos[0], 617 "refFrame": None, 618 "wcs_equinox": None, 619 "bandpassId": None, 620 "bandpassUnit": None, 621 "bandpassRefval": None, 622 "bandpassLo": pos[0], 623 "bandpassHi": pos[0]+size[0], 624 "pixflags": None, 625 "accref": "image/%s/%s"%(pos, size), 626 "accsize": (30+int(pos[0]+pos[1]+size[0]+size[1]))*1024, 627 "embargo": None, 628 "owner": None, 629 }) 630 return res
631
632 633 -class StandIn(object):
634 """A class having the attributes passed as kwargs to the constructor. 635 """
636 - def __init__(self, **kwargs):
637 for key, value in kwargs.items(): 638 setattr(self, key, value)
639
640 641 -def getTestRD(id="test.rd"):
642 from gavo import rscdesc #noflake: import above is conditional 643 from gavo import base 644 return base.caches.getRD("data/%s"%id)
645
646 647 -def getTestTable(tableName, id="test.rd"):
648 return getTestRD(id).getTableDefById(tableName)
649
650 651 -def getTestData(dataId):
652 return getTestRD().getDataById(dataId)
653
654 655 -def captureOutput(callable, args=(), kwargs={}):
656 """runs callable(*args, **kwargs) and captures the output. 657 658 The function returns a tuple of return value, stdout output, stderr output. 659 """ 660 realOut, realErr = sys.stdout, sys.stderr 661 sys.stdout, sys.stderr = StringIO(), StringIO() 662 try: 663 retVal = 2 # in case the callable sys.exits 664 try: 665 retVal = callable(*args, **kwargs) 666 except SystemExit: 667 # don't terminate just because someone thinks it's a good idea 668 pass 669 finally: 670 outCont, errCont = sys.stdout.getvalue(), sys.stderr.getvalue() 671 sys.stdout, sys.stderr = realOut, realErr 672 return retVal, outCont, errCont
673
674 675 -class FakeContext(object):
676 """A scaffolding class for testing renderers. 677 678 This will in general not be enough as most actions renderers do will 679 require a running reactor, so you need trial. But sometimes it's 680 all synchronous and this will do. 681 682 You can simulate a simple locateChild/renderHTTP cycle with 683 runRequest(resource, path). This will, in particular, use 684 a renderSync method on the page(s) if available. It will hand 685 out exceptions as usual. It will return what's written to context 686 and returned by renderHTTP. 687 """
688 - def __init__(self, **kwargs):
689 self.request = FakeRequest(args=kwargs) 690 self.args = kwargs 691 self.data = []
692
693 - def runRequest(self, resource, path):
694 if path: 695 segments = path.split("/") 696 else: 697 segments = () 698 699 while segments: 700 resource, segments = resource.locateChild(self, segments) 701 702 res = getattr(resource, "renderSync", resource.renderHTTP)(self) 703 if res: 704 if hasattr(res, "result"): # result of an unnecessary maybeDeferred 705 self.request.write(res.result) 706 else: 707 self.request.write(res) 708 709 return self.request.accumulator
710 711 712 registerAdapter(lambda ctx: ctx.request, FakeContext, inevow.IRequest)
713 714 715 -class CatchallUI(object):
716 """A replacement for base.ui, collecting the messages being sent. 717 718 This is to write tests against producing UI events. Use it with 719 the messageCollector context manager below. 720 """
721 - def __init__(self):
722 self.events = []
723
724 - def record(self, evType, args, kwargs):
725 self.events.append((evType, args, kwargs))
726
727 - def __getattr__(self, attName):
728 if attName.startswith("notify"): 729 return lambda *args, **kwargs: self.record(attName[6:], args, kwargs)
730
731 732 @contextlib.contextmanager 733 -def messageCollector():
734 """A context manager recording UI events. 735 736 The object returned by the context manager is a CatchallUI; get the 737 events accumulated during the run time in its events attribute. 738 """ 739 tempui = CatchallUI() 740 realui = base.ui 741 try: 742 base.ui = tempui 743 yield tempui 744 finally: 745 base.ui = realui
746
747 748 -def trialMain(testClass):
749 from twisted.trial import runner 750 from twisted.scripts import trial as script 751 config = script.Options() 752 config.parseOptions() 753 trialRunner = script._makeRunner(config) 754 if len(sys.argv)>1: 755 suite = runner.TestSuite() 756 for t in sys.argv[1:]: 757 suite.addTest(testClass(t)) 758 else: 759 sys.argv.append(sys.argv[0]) 760 config.parseOptions() 761 suite = script._getSuite(config) 762 trialRunner.run(suite)
763
764 765 -def getServerInThread(data, onlyOnce=False):
766 """runs a server in a thread and returns thread and base url. 767 768 onlyOnce will configure the server such that it destroys itself 769 after having handled one request. The thread would still need 770 to be joined. 771 772 So, better use the DataServer context manager. 773 """ 774 class Handler(BaseHTTPServer.BaseHTTPRequestHandler): 775 def do_GET(self): 776 self.wfile.write(data)
777 do_POST = do_GET 778 779 port = 34000 780 httpd = BaseHTTPServer.HTTPServer(('', port), Handler) 781 782 if onlyOnce: 783 serve = httpd.handle_request 784 else: 785 serve = httpd.serve_forever 786 787 t = threading.Thread(target=serve) 788 t.setDaemon(True) 789 t.start() 790 return httpd, t, "http://localhost:%s"%port 791
792 793 @contextlib.contextmanager 794 -def DataServer(data):
795 """a context manager for briefly running a web server returning data. 796 797 This yields the base URL the server is listening on. 798 """ 799 httpd, t, baseURL = getServerInThread(data) 800 801 yield baseURL 802 803 httpd.shutdown() 804 t.join(10)
805
806 807 @contextlib.contextmanager 808 -def userconfigContent(content):
809 """a context manager to temporarily set some content to userconfig. 810 811 This cleans up after itself and clears any userconfig cache before 812 it sets to work. 813 814 content are RD elements without the root (resource) tag. 815 """ 816 userConfigPath = os.path.join( 817 base.getConfig("configDir"), "userconfig.rd") 818 base.caches.clearForName(userConfigPath[:-3]) 819 with open(userConfigPath, "w") as f: 820 f.write('<resource schema="__system">\n' 821 +content 822 +'\n</resource>\n') 823 try: 824 yield 825 finally: 826 os.unlink(userConfigPath) 827 base.caches.clearForName(userConfigPath[:-3])
828
829 830 -def main(testClass, methodPrefix=None):
831 ensureResources() 832 833 if os.environ.get("GAVO_LOG")!="no": 834 base.DEBUG = True 835 from gavo.user import logui 836 logui.LoggingUI(base.ui) 837 838 if "GAVO_OOTTEST" not in os.environ: 839 # run any pending upgrades (that's a test for them, too... of sorts) 840 from gavo.user import upgrade 841 upgrade.upgrade() 842 843 try: 844 # two args: first one is class name, locate it in caller's globals 845 # and ignore anything before any dot for cut'n'paste convenience 846 if len(sys.argv)>2: 847 className = sys.argv[-2].split(".")[-1] 848 testClass = getattr(sys.modules["__main__"], className) 849 850 # one arg: test method prefix on testClass 851 if len(sys.argv)>1: 852 suite = unittest.makeSuite(testClass, methodPrefix or sys.argv[-1], 853 suiteClass=testresources.OptimisingTestSuite) 854 else: # Zero args, emulate unittest.run behaviour 855 suite = testresources.TestLoader().loadTestsFromModule( 856 sys.modules["__main__"]) 857 858 runner = unittest.TextTestRunner( 859 verbosity=int(os.environ.get("TEST_VERBOSITY", 1))) 860 runner.run(suite) 861 except (SystemExit, KeyboardInterrupt): 862 raise 863 except: 864 base.showHints = True 865 from gavo.user import errhandle 866 traceback.print_exc() 867 errhandle.raiseAndCatch(base) 868 sys.exit(1)
869