Source code for gavo.rscdef.regtest

"""
A framework for regression tests within RDs.

The basic idea is that there's small pieces of python almost-declaratively
defining tests for a given piece of data.	These things can then be
run while (or rather, after) executing dachs val.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import argparse
import base64
import collections
import io
import pickle as pickle
import http.client
import os
import queue
import random
import re
import sys
import time
import threading
import traceback
import unittest
import urllib.parse

try:
	from urllib3 import filepost as req_filepost
	from urllib3 import fields as req_fields
except ImportError:
	# we need requests to format multipart uploads since python's email
	# package is broken for that purpose (formats everything as text).
	# I don't want to hard-depend on requests, though.
	# TODO: skip tests that want it rather than crashing them
	pass

from lxml import etree as lxtree

from gavo import base
from gavo import votable
from gavo import utils
from gavo.utils import EqualingRE  #noflake: published name
from gavo.rscdef import common
from gavo.rscdef import procdef

################## Utilities

@functools.lru_cache(1)
def _loadCreds():
	"""returns a dictionary of auth keys to user/password pairs from
	~/.gavo/test.creds
	"""
	res = {}
	try:
		with open(os.path.join(os.environ["HOME"], ".gavo", "test.creds"),
				"rb") as f:
			for ln in f:
				authKey, user, pw = ln.strip().split()
				res[authKey.decode("utf-8")] = (user, pw)
	except IOError:
		pass
	return res


[docs]def getAuthFor(authKey): """returns a header dictionary to authenticate for authKey. authKey is a key into ~/.gavo/test.creds. """ try: user, pw = _loadCreds()[authKey] except KeyError: raise base.NotFoundError(authKey, "Authorization info", "~/.gavo/test.creds") return {'Authorization': b"Basic "+( base64.b64encode(b"%s:%s"%(user, pw))).strip()}
[docs]def doHTTPRequest(scheme, method, host, path, query, payload, headers, timeout): """creates the HTTP request and retrieves the result. """ try: connClass = { "http": http.client.HTTPConnection, "https": http.client.HTTPSConnection}[scheme] except KeyError: raise base.ReportableError( f"Unsupported scheme for regTest URL: {scheme}") conn = connClass(host, timeout=timeout) conn.connect() try: if query: path = path+"?"+query conn.request(method, path, payload, headers) resp = conn.getresponse() respHeaders = resp.getheaders() content = resp.read() finally: conn.close() return resp.status, respHeaders, content
[docs]def getHeaderValue(headers, key): """returns the value for key in the httplib headers. Matching is case-insensitive as required by HTTP. Missing keys raise KeyErrors. """ for hKey, hValue in headers: if hKey.lower()==key.lower(): return hValue raise KeyError(key)
[docs]class Keywords(argparse.Action): """A class encapsulating test selection keywords. There's a match method that takes a string and returns true if either no keywords are defined or all keywords are present in other (after case folding). This doubles as an argparse action and as such is "self-parsing" if you will. """ def __init__(self, *args, **kwargs): argparse.Action.__init__(self, *args, **kwargs) self.keywords = set() def __call__(self, parser, namespace, values, option_string=None): self.keywords = self._normalise(values) setattr(namespace, self.dest, self) def _normalise(self, s): return set(re.sub("[^\w\s]+", "", s).lower().split())
[docs] def match(self, other): if not self.keywords: return True return not self.keywords-self._normalise(other)
################## RD elements
[docs]class DynamicOpenVocAttribute(base.AttributeDef): """an attribute that collects arbitrary attributes in a sequence of pairs. The finished sequence is available as a freeAttrs attribute on the embedding instance. No parsing is done, everything is handled as a string. """ typeDesc_ = "any attribute not otherwise used" def __init__(self, name, **kwargs): base.AttributeDef.__init__(self, name, **kwargs)
[docs] def feedObject(self, instance, value): if not hasattr(instance, "freeAttrs"): instance.freeAttrs = [] instance.freeAttrs.append((self.name_, value))
[docs] def feed(self, ctx, instance, value): self.feedObject(instance, value)
[docs] def getCopy(self, instance, newParent): raise NotImplementedError("This needs some thought")
[docs] def makeUserDoc(self): return "(ignore)"
[docs] def iterParentMethods(self): def getAttribute(self, name): # we need an instance-private attribute dict here: if self.managedAttrs is self.__class__.managedAttrs: self.managedAttrs = self.managedAttrs.copy() try: return base.Structure.getAttribute(self, name) except base.StructureError: # no "real" attribute, it's a macro def self.managedAttrs[name] = DynamicOpenVocAttribute(name) # that's a decoy to make Struct.validate see a value for the attribute setattr(self, name, None) return self.managedAttrs[name] yield "getAttribute", getAttribute
class _FormData(object): """a container for multipart/form-data encoded messages. This is used for file uploads and depends on requests for that. """ def __init__(self): self.fields = [] def addFile(self, paramName, fileName, data): """attaches the contents of fileName under the http parameter name paramName. """ field = req_fields.RequestField(paramName, data, fileName) field.make_multipart(content_type="application/octet-stream") self.fields.append(field) def addParam(self, paramName, paramVal): """adds a form parameter paramName with the (string) value paramVal """ field = req_fields.RequestField(paramName, paramVal) field.make_multipart(content_type=None) self.fields.append(field) def encode(self): """returns the formatted payload for the upload as bytes, and the content-type to use (including the boundary). """ return req_filepost.encode_multipart_formdata(self.fields)
[docs]class Upload(base.Structure): """An upload going with a URL. """ name_ = "httpUpload" _src = common.ResdirRelativeAttribute("source", default=base.NotGiven, description="Path to a file containing the data to be uploaded.", copyable=True) _name = base.UnicodeAttribute("name", default=base.Undefined, description="Name of the upload parameter", copyable=True) _filename = base.UnicodeAttribute("fileName", default="upload.dat", description="Remote file name for the uploaded file.", copyable=True) _content = base.DataContent(description="Inline data to be uploaded" " (conflicts with source)") @property def rd(self): return self.parent.rd
[docs] def addToForm(self, form): """sets up a _Form instance to upload the data. """ if self.content_: data = self.content_.encode("utf-8") else: with open(self.source, "rb") as f: data = f.read() form.addFile(self.name, self.fileName, data)
[docs] def validate(self): if (self.content_ and self.source or not (self.content_ or self.source)): raise base.StructureError("Exactly one of element content and source" " attribute must be given for an upload.")
def _iterInChunks(stuff, chunkSize): """returns a function returning stuff in bits of chunkSize elements (of stuff). """ def iterate(): offset = 0 while True: chunk = stuff[offset:offset+chunkSize] if not chunk: return else: yield chunk offset += chunkSize return iterate
[docs]class DataURL(base.Structure): """A source document for a regression test. As string URLs, they specify where to get data from, but the additionally let you specify uploads, authentication, headers and http methods, while at the same time saving you manual escaping of parameters. The bodies is the path to run the test against. This is interpreted as relative to the RD if there's no leading slash, relative to the server if there's a leading slash, and absolute if there's a scheme. The attributes are translated to parameters, except for a few pre-defined names. If you actually need those as URL parameters, should at us and we'll provide some way of escaping these. We don't actually parse the URLs coming in here. GET parameters are appended with a & if there's a ? in the existing URL, with a ? if not. Again, shout if this is too dumb for you (but urlparse really isn't all that robust either...) """ name_ = "url" # httpURL will be set to the URL actually used in retrieveResource # Only use this to report the source of the data for, e.g., failing # tests. httpURL = "(not retrieved)" _base = base.DataContent(description="Base for URL generation; embedded" " whitespace will be removed, so you're free to break those wherever" " you like.", copyable=True) _httpMethod = base.UnicodeAttribute("httpMethod", description="Request method; usually one of GET or POST", default="GET") _httpPost = common.ResdirRelativeAttribute("postPayload", default=base.NotGiven, description="Path to a file containing material that should go" " with a POST request (conflicts with additional parameters).", copyable=True) _postMediaType = base.UnicodeAttribute("httpPostMediaType", default="application/octet-stream", description="The media type of postPayload", copyable=True) _parset = base.EnumeratedUnicodeAttribute("parSet", description="Preselect a default parameter set; form gives what" " our framework adds to form queries.", default=base.NotGiven, validValues=["form", "TAP"], copyable=True) _httpHeaders = base.DictAttribute("httpHeader", description="Additional HTTP headers to pass.", copyable=True) _httpAuthKey = base.UnicodeAttribute("httpAuthKey", description="A key into ~/.gavo/test.creds to find a user/password" " pair for this request.", default=base.NotGiven, copyable=True) _httpUploads = base.StructListAttribute("uploads", childFactory=Upload, description='HTTP uploads to add to request (must have httpMethod="POST")', copyable=True) _httpHonorRedirects = base.BooleanAttribute("httpHonorRedirects", default=False, description="Follow 30x redirects instead of just using" " status, headers, and payload of the initial request.", copyable=True) _httpChunkSize = base.IntAttribute("httpChunkSize", default=None, description="If there are uploads, upload them in chunks of this" " many bytes using chunked encoding.", copyable=True) _rd = common.RDAttribute() _open = DynamicOpenVocAttribute("open")
[docs] def getValue(self, serverURL): """returns a pair of full request URL and postable payload for this test. """ urlBase = re.sub(r"\s+", "", self.content_) if "://" in urlBase: # we believe there's a scheme in there pass elif urlBase.startswith("/"): urlBase = serverURL+urlBase else: urlBase = serverURL+"/"+self.parent.rd.sourceId+"/"+urlBase if self.httpMethod=="POST": return urlBase else: return self._addParams(urlBase, urllib.parse.urlencode(self.getParams()))
[docs] def getParams(self): """returns the URL parameters as a sequence of kw, value pairs. """ params = getattr(self, "freeAttrs", []) if self.parSet=="form": params.extend([("__nevow_form__", "genForm"), ("submit", "Go"), ("_charset_", "UTF-8")]) elif self.parSet=='TAP': params.extend([("LANG", "ADQL"), ("REQUEST", "doQuery")]) return params
[docs] def retrieveResource(self, serverURL, timeout): """returns a triple of status, headers, and content for retrieving this URL. """ self.httpURL, payload = self.getValue(serverURL), None headers = { "user-agent": "DaCHS regression tester"} headers.update(self.httpHeader) if self.httpMethod=="POST": if self.postPayload: headers["content-type"] = self.httpPostMediaType with open(self.postPayload, "rb") as f: payload = f.read() elif self.uploads: form = _FormData() for key, value in self.getParams(): form.addParam(key, value) for upload in self.uploads: upload.addToForm(form) payload, ct = form.encode() headers["Content-Type"] = ct if self.httpChunkSize: payload = _iterInChunks(payload, self.httpChunkSize)() else: payload = urllib.parse.urlencode(self.getParams()) headers["Content-Type"] = "application/x-www-form-urlencoded" scheme, host, path, _, query, _ = urllib.parse.urlparse(str(self.httpURL)) if self.httpAuthKey is not base.NotGiven: headers.update(getAuthFor(self.httpAuthKey)) status, respHeaders, content = doHTTPRequest( scheme, str(self.httpMethod), host, path, query, payload, headers, timeout) while self.httpHonorRedirects and status in [301, 302, 303]: scheme, host, path, _, query, _ = urllib.parse.urlparse( getHeaderValue(respHeaders, "location")) status, respHeaders, content = doHTTPRequest(scheme, "GET", host, path, query, None, {}, timeout) return status, respHeaders, content
def _addParams(self, urlBase, params): """a brief hack to add query parameters to GET-style URLs. This is a workaround for not trusting urlparse and is fairly easy to fool. Params must already be fully encoded. """ if not params: return urlBase if "?" in urlBase: return urlBase+"&"+params else: return urlBase+"?"+params
[docs] def validate(self): if self.postPayload is not base.NotGiven: if self.getParams(): raise base.StructureError("No parameters (or parSets) are" " possible with postPayload") if self.httpMethod!="POST": raise base.StructureError("Only POST is allowed as httpMethod" " together with postPayload") if self.uploads: if self.httpMethod!="POST": raise base.StructureError("Only POST is allowed as httpMethod" " together with upload") super().validate()
[docs]class RegTest(procdef.ProcApp, unittest.TestCase): """A regression test. Tests are defined through url and code elements. See `Regression Testing`_ for more information. """ name_ = "regTest" requiredType = "regTest" formalArgs = "self" data = b"<No data retrieved yet>" requestTime = None runCount = 1 additionalNamesForProcs = { "EqualingRE": EqualingRE} _title = base.NWUnicodeAttribute("title", default=base.Undefined, description="A short, human-readable phrase describing what this" " test is exercising.") _url = base.StructAttribute("url", childFactory=DataURL, default=base.NotGiven, description="The source from which to fetch the test data.") _tags = base.StringSetAttribute("tags", description="A list of (free-form) tags for this test. Tagged tests" " are only run when the runner is constructed with at least one" " of the tags given. This is mainly for restricting tags to production" " or development servers.") _rd = common.RDAttribute() def __init__(self, *args, **kwargs): unittest.TestCase.__init__(self, "fakeForPyUnit") procdef.ProcApp.__init__(self, *args, **kwargs)
[docs] def fakeForPyUnit(self): raise AssertionError("This is not a pyunit test right now")
@property def description(self): source = "" if self.rd: id = self.rd.sourceId source = " (%s)"%id return self.title+source
[docs] def retrieveData(self, serverURL, timeout): """returns headers and content when retrieving the resource at url. Sets the headers and data attributes of the test instance. """ startTime = time.time() if self.url is base.NotGiven: self.status, self.headers, self.data = None, None, None else: self.status, self.headers, self.data = self.url.retrieveResource( serverURL, timeout=timeout) self.requestTime = time.time()-startTime
[docs] def getDataSource(self): """returns a string pointing people to where data came from. """ if self.url is base.NotGiven: return "(Unconditional)" else: return self.url.httpURL
[docs] def pointNextToLocation(self, addToPath=""): """arranges for the value of the location header to become the base URL of the next test. addToPath, if given, is appended to the location header. If no location header was provided, the test fails. All this of course only works for tests in sequential regSuites. """ if not hasattr(self, "followUp"): raise AssertionError("pointNextToLocation only allowed within" " sequential regSuites") for key, value in self.headers: if key.lower()=='location': self.followUp.url.content_ = value+addToPath break else: raise AssertionError("No location header in redirect")
[docs] @utils.document def assertHasStrings(self, *strings): """checks that all its arguments are found within content. If string arguments are passed, they are utf-8 encoded before comparison. If that's not what you want, pass bytes yourself. """ for phrase in strings: assert utils.bytify(phrase) in self.data, "%s missing"%repr(phrase)
[docs] @utils.document def assertLacksStrings(self, *strings): """checks that all its arguments are *not* found within content. """ for phrase in strings: assert utils.bytify(phrase) not in self.data, \ "Unexpected: '%s'"%repr(phrase)
[docs] @utils.document def assertHTTPStatus(self, expectedStatus): """checks whether the request came back with expectedStatus. """ assert expectedStatus==self.status, ("Bad status received, %s instead" " of %s"%(self.status, expectedStatus))
[docs] @utils.document def assertValidatesXSD(self): """checks whether the returned data are XSD valid. This uses DaCHS built-in XSD validator with the built-in schema files; it hence will in general not retrieve schema files from external sources. """ from gavo.helpers import testtricks msgs = testtricks.getXSDErrors(self.data) if msgs: raise AssertionError("Response not XSD valid. Validator output" " starts with\n%s"%(msgs[:160]))
XPATH_NAMESPACE_MAP = { "v": "http://www.ivoa.net/xml/VOTable/v1.3", "v2": "http://www.ivoa.net/xml/VOTable/v1.2", "v1": "http://www.ivoa.net/xml/VOTable/v1.1", "o": "http://www.openarchives.org/OAI/2.0/", "h": "http://www.w3.org/1999/xhtml", "m": "http://www.ivoa.net/xml/mivot", }
[docs] @utils.document def assertXpath(self, path, assertions): """checks an xpath assertion. path is an xpath (as understood by lxml), with namespace prefixes statically mapped; there's currently v2 (VOTable 1.2), v1 (VOTable 1.1), v (whatever VOTable version is the current DaCHS default), h (the namespace of the XHTML elements DaCHS generates), m (the provisional MIVOT namespace) and o (OAI-PMH 2.0). If you need more prefixes, hack the source and feed back your changes (or just add to self.XPATH_NAMESPACE_MAP locally). path must match exactly one element. assertions is a dictionary mapping attribute names to their expected value. Use the key None to check the element content, and match for None if you expect an empty element. To match against a namespaced attribute, you have to give the full URI; prefixes are not applied here. This would look like:: "{http://www.w3.org/2001/XMLSchema-instance}type": "vg:OAIHTTP" If you need an RE match rather than equality, there's EqualingRE in your code's namespace. """ if not hasattr(self, "cached parsed tree"): setattr(self, "cached parsed tree", lxtree.fromstring(self.data)) tree = getattr(self, "cached parsed tree") res = tree.xpath(path, namespaces=self.XPATH_NAMESPACE_MAP) if len(res)==0: raise AssertionError("Element not found: %s"%path) elif len(res)!=1: raise AssertionError("More than one item matched for %s"%path) el = res[0] for key, val in assertions.items(): if key is None: try: foundVal = el.text except AttributeError: # assume the expression was for an attribute and just use the # value foundVal = el else: foundVal = el.attrib[key] assert val==foundVal, "Trouble with %s: %s (%s, %s)"%( key or "content", path, repr(val), repr(foundVal))
[docs] @utils.document def getXpath(self, path, element=None): """returns the equivalent of tree.xpath(path) for an lxml etree of the current document or in element, if passed in. This uses the same namespace conventions as assertXpath. """ if element is None: if not hasattr(self, "_parsedTree"): self._parsedTree = lxtree.fromstring(self.data) element = self._parsedTree return element.xpath(path, namespaces=self.XPATH_NAMESPACE_MAP)
[docs] @utils.document def assertHeader(self, key, value): """checks that header key has value in the response headers. keys are compared case-insensitively, values are compared literally. """ try: foundValue = getHeaderValue(self.headers, key) self.assertEqual(value, foundValue) except KeyError: raise AssertionError("Header %s not found in %s"%( key, self.headers))
[docs] @utils.document def getFirstVOTableRow(self, rejectExtras=True): """interprets data as a VOTable and returns the first row as a dictionary It will normally ensure that only one row is returned. To make it silently discard extra rows, make sure the result is sorted, or you will get randomly failing tests. Database-querying cores (which is where order is an issue) also honor _DBOPTIONS_ORDER). """ data, metadata = votable.loads(self.data) rows = metadata.iterDicts(data) result = next(rows) if rejectExtras: try: secondRow = next(rows) except StopIteration: pass else: raise AssertionError( f"getFirstVOTableRow swallows a row: {secondRow}") return result
[docs] @utils.document def getVOTableRows(self): """parses the first table in a result VOTable and returns the contents as a sequence of dictionaries. """ data, metadata = votable.loads(self.data) return list(metadata.iterDicts(data))
[docs] @utils.document def getUnique(self, seq): """returns seq[0], asserting at the same time that len(seq) is 1. The idea is that you can say row = self.getUnique(self.getVOTableRows()) and have a nice test on the side -- and no ugly IndexError on an empty respone. """ self.assertEqual(len(seq), 1) return seq[0]
[docs]class RegTestSuite(base.Structure): """A suite of regression tests. """ name_ = "regSuite" _tests = base.StructListAttribute("tests", childFactory=RegTest, description="Tests making up this suite", copyable=False) _title = base.NWUnicodeAttribute("title", description="A short, human-readable phrase describing what this" " suite is about.") _sequential = base.BooleanAttribute("sequential", description="Set to true if the individual tests need to be run" " in sequence.", default=False)
[docs] def itertests(self, tags, keywords): for test in self.tests: if test.tags and not test.tags&tags: continue if keywords and not keywords.match(test.title): continue yield test
[docs] def completeElement(self, ctx): if self.title is None: self.title = "Test suite from %s"%self.parent.sourceId super().completeElement(ctx)
[docs] def expand(self, *args, **kwargs): """hand macro expansion to the RD. """ return self.parent.expand(*args, **kwargs)
#################### Running Tests
[docs]class TestStatistics(object): """A statistics gatherer/reporter for the regression tests. """ def __init__(self, verbose=True): self.verbose = False self.runs = [] self.oks, self.fails, self.total = 0, 0, 0 self.globalStart = time.time() self.lastTimestamp = time.time()+1 self.timeSum = 0
[docs] def add(self, status, runTime, title, payload, srcRD): """adds a test result to the statistics. status is either OK, FAIL, or ERROR, runTime is the time spent in running the test, title is the test's title, and payload is "something" associated with failures that should help diagnosing them. """ if status=="OK": self.oks += 1 else: if self.verbose: print(">>>>>>>>", status) self.fails += 1 self.total += 1 self.timeSum += runTime #XXX TODO: Payload can use a lot of memory -- I'm nuking it for now # -- maybe use an on-disk database to store this and allow later debugging? self.runs.append((runTime, status, title, None, #str(payload), srcRD)) self.lastTimestamp = time.time()
[docs] def getReport(self): """returns a string representation of a short report on how the tests fared. """ try: return ("%d of %d bad. avg %.2f, min %.2f, max %.2f. %.1f/s, par %.1f" )%(self.fails, self.fails+self.oks, self.timeSum/len(self.runs), min(self.runs)[0], max(self.runs)[0], float(self.total)/( self.lastTimestamp-self.globalStart), self.timeSum/(self.lastTimestamp-self.globalStart)) except ZeroDivisionError: return "No tests run (probably did not find any)."
[docs] def getFailures(self): """returns a string containing some moderately verbose info on the failures collected. """ failures = {} for runTime, status, title, payload, srcRD in self.runs: if status!="OK": failures.setdefault(srcRD, []).append("%s %s"%(status, title)) return "\n".join("From %s:\n %s\n\n"%(srcRD, "\n ".join(badTests)) for srcRD, badTests in failures.items())
[docs] def save(self, target): """saves the entire test statistics to target. This is a pickle of basically what's added with add. No tools for doing something with this are provided so far. """ with open(target, "wb") as f: pickle.dump(self.runs, f)
[docs]class TestRunner(object): """A runner for regression tests. It is constructed with a sequence of suites (RegTestSuite instances) and allows running these in parallel. It honors the suites' wishes as to being executed sequentially. """ # The real trick here are the test suites with state (sequential=True). For # those, the individual tests must be serialized, which happens using the magic # followUp attribute on the tests. def __init__(self, suites, serverURL=None, verbose=True, dumpNegative=False, tags=None, timeout=45, failFile=None, nRepeat=1, execDelay=0, nThreads=8, printTitles=False, keywords=None): self.verbose, self.dumpNegative = verbose, dumpNegative self.failFile, self.nRepeat = failFile, nRepeat self.printTitles = printTitles if tags: self.tags = tags else: self.tags = frozenset() self.timeout = timeout self.execDelay = execDelay self.nThreads = nThreads self.keywords = keywords self.serverURL = serverURL or base.getConfig("web", "serverurl") self.curRunning = {} self.threadId = 0 self._makeTestList(suites) self.stats = TestStatistics(verbose=self.verbose) self.resultsQueue = queue.Queue()
[docs] @classmethod def fromRD(cls, rd, **kwargs): """constructs a TestRunner for a single ResourceDescriptor. """ return cls(rd.tests, **kwargs)
[docs] @classmethod def fromSuite(cls, suite, **kwargs): """constructs a TestRunner for a RegTestSuite suite """ return cls([suite], **kwargs)
[docs] @classmethod def fromTest(cls, test, **kwargs): """constructs a TestRunner for a single RegTest """ return cls([base.makeStruct(RegTestSuite, tests=[test], parent_=test.parent.parent)], **kwargs)
def _makeTestList(self, suites): """puts all individual tests from all test suites in a deque. """ self.testList = collections.deque() for suite in suites: if suite.sequential: self._makeTestsWithState(suite) else: self.testList.extend(suite.itertests(self.tags, self.keywords)) def _makeTestsWithState(self, suite): """helps _makeTestList by putting suite's test in a way that they are executed sequentially. """ # technically, this is done by just entering the suite's "head" # and have that pull all the other tests in the suite behind it. tests = list(suite.itertests(self.tags, self.keywords)) if tests: firstTest = tests.pop(0) self.testList.append(firstTest) for test in tests: firstTest.followUp = test firstTest = test def _spawnThread(self): """starts a new test in a thread of its own. """ test = self.testList.popleft() if self.printTitles: sys.stderr.write(" <%s> "%test.title) sys.stderr.flush() newThread = threading.Thread(target=self.runOneTest, args=(test, self.threadId, self.execDelay)) newThread.description = test.description newThread.setDaemon(True) self.curRunning[self.threadId] = newThread self.threadId += 1 newThread.start() if test.runCount<self.nRepeat: test.runCount += 1 self.testList.append(test)
[docs] def runOneTest(self, test, threadId, execDelay): """runs test and puts the results in the result queue. This is usually run in a thread. However, threadId is only used for reporting, so you may run this without threads. To support sequential execution, if test has a followUp attribute, this followUp is queued after the test has run. If the execDelay argument is non-zero, the thread delays its execution by that many seconds. """ if execDelay: time.sleep(execDelay) startTime = time.time() try: try: test.retrieveData(self.serverURL, timeout=self.timeout) test.compile()(test) self.resultsQueue.put(("OK", test, None, None, time.time()-startTime)) except KeyboardInterrupt: raise except AssertionError as ex: self.resultsQueue.put(("FAIL", test, ex, None, time.time()-startTime)) # races be damned if self.dumpNegative: print("Content of failing test:\n%s\n"%test.data) if self.failFile: with open(self.failFile, "wb") as f: f.write(test.data) except Exception as ex: if self.failFile and getattr(test, "data", None) is not None: with open(self.failFile, "wb") as f: f.write(test.data) f = io.StringIO() traceback.print_exc(file=f) self.resultsQueue.put(("ERROR", test, ex, f.getvalue(), time.time()-startTime)) finally: if hasattr(test, "followUp"): self.resultsQueue.put(("addTest", test.followUp, None, None, 0)) if threadId is not None: self.resultsQueue.put(("collectThread", threadId, None, None, 0))
def _printStat(self, state, test, payload, traceback): """gives feedback to the user about the result of a test. """ if not self.verbose: return if state=="FAIL": print("**** Test failed: %s -- %s\n"%( test.title, test.getDataSource())) print(">>>>", payload) elif state=="ERROR": print("**** Internal Failure: %s -- %s\n"%(test.title, test.url.httpURL)) print(traceback) def _runTestsReal(self, showDots=False): """executes the tests, taking tests off the queue and spawning threads until the queue is empty. showDots, if True, instructs the runner to push one dot to stderr per test spawned. """ while self.testList or self.curRunning: while len(self.curRunning)<self.nThreads and self.testList: self._spawnThread() evType, test, payload, traceback, dt = self.resultsQueue.get( timeout=self.timeout) if evType=="addTest": self.testList.appendleft(test) elif evType=="collectThread": deadThread = self.curRunning.pop(test) deadThread.join() else: self.stats.add(evType, dt, test.title, "", test.rd.sourceId) if showDots: if evType=="OK": sys.stderr.write(".") else: sys.stderr.write("E") sys.stderr.flush() self._printStat(evType, test, payload, traceback) if showDots: sys.stderr.write("\n")
[docs] def runTests(self, showDots=False): """executes the tests in a random order and in parallel. """ random.shuffle(self.testList) try: self._runTestsReal(showDots=showDots) except queue.Empty: sys.stderr.write("******** Hung jobs\nCurrently executing:\n") for thread in list(self.curRunning.values()): sys.stderr.write("%s\n"%thread.description)
[docs] def runTestsInOrder(self): """runs all tests sequentially and in the order they were added. """ for test in self.testList: self.runOneTest(test, None, self.execDelay) try: while True: evType, test, payload, traceback, dt = self.resultsQueue.get(False) if evType=="addTest": self.testList.appendleft(test) else: self.stats.add(evType, dt, test.title, "", test.rd.sourceId) self._printStat(evType, test, payload, traceback) except queue.Empty: pass
################### command line interface
[docs]def urlToURL(): """converts HTTP (GET) URLs to URL elements. """ # This is what's invoked by the makeTestURLs command. while True: parts = urllib.parse.urlparse(input()) print("<url %s>%s</url>"%( " ".join('%s="%s"'%(k,v[0]) for k,v in urllib.parse.parse_qs(parts.query).items()), parts.path))
def _getRunnerForAll(runnerArgs, showProgress): from gavo.registry import publication from gavo import api suites = [] for rdId in publication.findAllRDs(): if showProgress: sys.stdout.write(rdId+" ") sys.stdout.flush() try: rd = api.getRD(rdId, doQueries=False) except Exception as msg: base.ui.notifyError("Error loading RD %s (%s). Ignoring."%( rdId, utils.safe_str(msg))) suites.extend(rd.tests) return TestRunner(suites, **runnerArgs) def _getRunnerForSingle(testId, runnerArgs): from gavo import api testElement = common.getReferencedElement(testId, doQueries=False) if isinstance(testElement, api.RD): runner = TestRunner.fromRD(testElement, **runnerArgs) elif isinstance(testElement, RegTestSuite): runner = TestRunner.fromSuite(testElement, **runnerArgs) elif isinstance(testElement, RegTest): runner = TestRunner.fromTest(testElement, **runnerArgs) else: raise base.ReportableError("%s is not a testable element."%testId, hint="Only RDs, regSuites, or regTests are eligible for testing.") return runner
[docs]def parseCommandLine(args=None): """parses the command line for main() """ parser = argparse.ArgumentParser(description="Run tests embedded in RDs") parser.add_argument("id", type=str, help="RD id or cross-RD identifier for a testable thing.") parser.add_argument("-v", "--verbose", help="Dump info on failed test", action="store_true", dest="verbose") parser.add_argument("-V", "--titles", help="Write title when starting" " a test.", action="store_true", dest="printTitles") parser.add_argument("-d", "--dump-negative", help="Dump the content of" " failing tests to stdout", action="store_true", dest="dumpNegative") parser.add_argument("-t", "--tag", help="Also run tests tagged with TAG.", action="store", dest="tag", default=None, metavar="TAG") parser.add_argument("-R", "--n-repeat", help="Run each test N times", action="store", dest="nRepeat", type=int, default=1, metavar="N") parser.add_argument("-T", "--timeout", help="Abort and fail requests" " after inactivity of SECONDS", action="store", dest="timeout", type=int, default=15, metavar="SECONDS") parser.add_argument("-D", "--dump-to", help="Dump the content of" " last failing test to FILE", metavar="FILE", action="store", type=str, dest="failFile", default=None) parser.add_argument("-w", "--wait", help="Wait SECONDS before executing" " a request", metavar="SECONDS", action="store", dest="execDelay", type=int, default=0) parser.add_argument("-u", "--serverURL", help="URL of the DaCHS root" " at the server to test", action="store", type=str, dest="serverURL", default=base.getConfig("web", "serverURL")) parser.add_argument("-n", "--number-par", help="Number of requests" " to be run in parallel", action="store", type=int, dest="nThreads", default=8) parser.add_argument("--seed", help="Seed the RNG with this number." " Note that this doesn't necessarily make the execution sequence" " predictable, just the submission sequence.", action="store", type=int, dest="randomSeed", default=None) parser.add_argument("-k", "--keywords", help="Only run tests" " with descriptions containing all (whitespace-separated) keywords." " Sequential tests will be run in full, nevertheless, if their head test" " matches.", action=Keywords, type=str, dest="keywords") parser.add_argument("-p", "--progress", help="Show progress when" " parsing RDs.", action="store_true", dest="showProgress") return parser.parse_args(args)
[docs]def main(args=None): """user interaction for gavo test. """ tags = None args = parseCommandLine(args) if args.randomSeed: random.seed(args.randomSeed) if args.tag: tags = set([args.tag]) if args.serverURL: args.serverURL = args.serverURL.rstrip("/") runnerArgs = { "verbose": args.verbose, "dumpNegative": args.dumpNegative, "serverURL": args.serverURL, "tags": tags, "failFile": args.failFile, "nRepeat": args.nRepeat, "timeout": args.timeout, "execDelay": args.execDelay, "nThreads": args.nThreads, "printTitles": args.printTitles, "keywords": args.keywords, } if args.id=="ALL": runner = _getRunnerForAll(runnerArgs, args.showProgress) else: runner = _getRunnerForSingle(args.id, runnerArgs) runner.runTests(showDots=True) print(runner.stats.getReport()) if runner.stats.fails: print(runner.stats.getFailures()) sys.exit(1)