Package gavo :: Package helpers :: Module testtricks
[frames] | no frames]

Source Code for Module gavo.helpers.testtricks

  1  """ 
  2  Helper functions and classes for unit tests and similar. 
  3   
  4  Whatever is useful to unit tests from here should be imported into 
  5  testhelpers, too.  Unit test modules should not be forced to import 
  6  this. 
  7  """ 
  8   
  9  #c Copyright 2008-2019, the GAVO project 
 10  #c 
 11  #c This program is free software, covered by the GNU GPL.  See the 
 12  #c COPYING file in the source distribution. 
 13   
 14   
 15  from __future__ import print_function 
 16   
 17  import contextlib 
 18  import gzip 
 19  import os 
 20  import re 
 21  import subprocess 
 22  import tempfile 
 23  import unittest 
 24   
 25  from lxml import etree 
 26   
 27  from gavo import base 
 28  from gavo import utils 
 29  from gavo.utils import stanxml 
30 31 32 -def _nukeNamespaces(xmlString):
33 nsCleaner = re.compile('^(</?)(?:[a-z0-9]+:)') 34 return re.sub("(?s)<[^>]*>", 35 lambda mat: nsCleaner.sub(r"\1", mat.group()), 36 re.sub('xmlns="[^"]*"', "", xmlString))
37
38 39 -def getXMLTree(xmlString, debug=False):
40 """returns an ``libxml2`` etree for ``xmlString``, where, for convenience, 41 all namespaces on elements are nuked. 42 43 The libxml2 etree lets you do xpath searching using the ``xpath`` method. 44 45 Nuking namespaces is of course not a good idea in general, so you 46 might want to think again before you use this in production code. 47 """ 48 tree = etree.fromstring(_nukeNamespaces(xmlString)) 49 50 if debug: 51 etree.dump(tree) 52 return tree
53
54 55 -def getXSDErrorsXerces(data, leaveOffending=False):
56 """returns Xerces error messages for XSD validation of data, or None 57 if data is valid. 58 59 See the docstring of XSDTestMixin for how to make this work. 60 61 This raises a unittest.SkipTest exception if the validator cannot be 62 found. 63 """ 64 # schemata/makeValidator.py dumps its validator class in the cacheDir 65 validatorDir = base.getConfig("cacheDir") 66 if not os.path.exists(os.path.join(validatorDir, "xsdval.class")): 67 raise unittest.SkipTest("java XSD valdiator not found -- run" 68 " schemata/makeValidator.py") 69 70 classpath = ":".join([validatorDir]+base.getConfig("xsdclasspath")) 71 handle, inName = tempfile.mkstemp("xerctest", "rm") 72 try: 73 with os.fdopen(handle, "w") as f: 74 f.write(data) 75 args = ["java", "-cp", classpath, "xsdval", 76 "-n", "-v", "-s", "-f", inName] 77 78 f = subprocess.Popen(args, 79 stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 80 xercMsgs = f.stdout.read() 81 status = f.wait() 82 if status or "Error]" in xercMsgs: 83 if leaveOffending: 84 with open("badDocument.xml", "w") as of: 85 of.write(data) 86 return xercMsgs 87 finally: 88 os.unlink(inName) 89 return None
90
91 92 -class XSDResolver(etree.Resolver):
93 """A resolver for external entities only returning in-tree files. 94 """
95 - def __init__(self):
96 self.basePath = "schemata"
97
98 - def getPathForName(self, name):
99 xsdName = name.split("/")[-1] 100 return base.getPathForDistFile( 101 os.path.join(self.basePath, xsdName))
102
103 - def resolve(self, url, pubid, context):
104 try: 105 # resolve namespace URIs, too 106 try: 107 url = stanxml.NSRegistry.getSchemaForNS(url) 108 except base.NotFoundError: 109 # it's not a (known) namespace URI, try on 110 pass 111 112 path = self.getPathForName(url) 113 res = self.resolve_filename(path, context) 114 if res is not None: 115 return res 116 except: 117 pass # fall through to error message 118 base.ui.notifyError("Did not find local file for schema %s --" 119 " this will fall back to network resources and thus probably" 120 " be slow"%url)
121 122 123 RESOLVER = XSDResolver() 124 XSD_PARSER = etree.XMLParser() 125 XSD_PARSER.resolvers.add(RESOLVER)
126 127 128 @contextlib.contextmanager 129 -def MyParser():
130 if etree.get_default_parser is XSD_PARSER: 131 yield 132 else: 133 etree.set_default_parser(XSD_PARSER) 134 try: 135 yield 136 finally: 137 etree.set_default_parser()
138
139 -class QNamer(object):
140 """A hack that generates QNames through getattr. 141 142 Construct with the desired namespace. 143 """
144 - def __init__(self, ns):
145 self.ns = ns
146
147 - def __getattr__(self, name):
148 return etree.QName(self.ns, name.strip("_"))
149 150 XS = QNamer("http://www.w3.org/2001/XMLSchema") 151 152 153 VO_SCHEMATA = [ 154 "Characterisation-v1.11.xsd", 155 "ConeSearch-v1.1.xsd", 156 "DataModel-v1.0.xsd", 157 "DocRegExt-v1.0.xsd", 158 "oai_dc.xsd", 159 "OAI-PMH.xsd", 160 "RegistryInterface-v1.0.xsd", 161 "SIA-v1.2.xsd", 162 "SLAP-v1.1.xsd", 163 "SSA-v1.2.xsd", 164 "StandardsRegExt-1.0.xsd", 165 "stc-v1.30.xsd", 166 "stc-v1.20.xsd", 167 "coords-v1.20.xsd", 168 "region-v1.20.xsd", 169 "TAPRegExt-v1.0.xsd", 170 "UWS-v1.1.xsd", 171 "VODataService-v1.1.xsd", 172 "VOEvent-1.0.xsd", 173 "VORegistry-v1.0.xsd", 174 "VOResource-v1.1.xsd", 175 "VOSIAvailability-v1.0.xsd", 176 "VOSICapabilities-v1.0.xsd", 177 "VOSITables-v1.0.xsd", 178 "VOTable-1.1.xsd", 179 "VOTable-1.2.xsd", 180 "VOTable-1.3.xsd", 181 "vo-dml-v1.0.xsd", 182 "xlink.xsd", 183 "XMLSchema.xsd", 184 "xml.xsd",]
185 186 187 -def getJointValidator(schemaPaths):
188 """returns an lxml validator containing the schemas in schemaPaths. 189 190 schemaPaths must be actual file paths, absolute or 191 trunk/schema-relative. 192 """ 193 with MyParser(): 194 subordinates = [] 195 for fName in schemaPaths: 196 fPath = RESOLVER.getPathForName(fName) 197 root = etree.parse(fPath).getroot() 198 subordinates.append(( 199 "http://vo.ari.uni-heidelberg.de/docs/schemata/"+fName, 200 root.get("targetNamespace"))) 201 202 root = etree.Element( 203 XS.schema, attrib={"targetNamespace": "urn:combiner"}) 204 for schemaLocation, tns in subordinates: 205 etree.SubElement(root, XS.import_, attrib={ 206 "namespace": tns, "schemaLocation": schemaLocation}) 207 208 doc = etree.ElementTree(root) 209 return etree.XMLSchema(doc)
210
211 212 -def getDefaultValidator(extraSchemata=[]):
213 """returns a validator that knows the schemata typically useful within 214 the VO. 215 216 This will currently only work if DaCHS is installed from an SVN 217 checkout with setup.py develop. 218 219 What's returned has a method assertValid(et) that raises an exception 220 if the elementtree et is not valid. You can simply call it to 221 get back True for valid and False for invalid. 222 """ 223 return getJointValidator(VO_SCHEMATA+extraSchemata)
224
225 226 -def _makeLXMLValidator():
227 """returns an lxml-based schema validating function for the VO XSDs 228 229 This is not happening at import time as it is time-consuming, and the 230 DaCHS server probably doesn't even validate anything. 231 232 This is used below to build getXSDErrorsLXML. 233 """ 234 VALIDATOR = getDefaultValidator() 235 236 def getErrors(data, leaveOffending=False): 237 """returns error messages for the XSD validation of the string in data. 238 """ 239 try: 240 with MyParser(): 241 if hasattr(data, "xpath"): 242 # we believe it's already parsed stuff 243 tree = data 244 else: 245 tree = etree.fromstring(data) 246 247 if VALIDATOR.validate(tree): 248 return None 249 else: 250 if leaveOffending: 251 if hasattr(data, "xpath"): 252 data = etree.tostring(data, encoding="utf-8") 253 with open("badDocument.xml", "w") as of: 254 of.write(data) 255 return str(VALIDATOR.error_log) 256 except Exception as msg: 257 return str(msg)
258 259 return getErrors 260
261 262 -def getXSDErrorsLXML(data, leaveOffending=False):
263 """returns error messages for the XSD validation of the string in data. 264 265 This is the lxml-based implemenation, much less disruptive than the 266 xerces-based one. 267 """ 268 if not hasattr(getXSDErrorsLXML, "validate"): 269 getXSDErrorsLXML.validate = _makeLXMLValidator() 270 return getXSDErrorsLXML.validate(data, leaveOffending)
271 272 273 getXSDErrors = getXSDErrorsLXML
274 275 276 -class XSDTestMixin(object):
277 """provides a assertValidates method doing XSD validation. 278 279 assertValidates raises an assertion error with the validator's 280 messages on an error. You can optionally pass a leaveOffending 281 argument to make the method store the offending document in 282 badDocument.xml. 283 284 The whole thing needs Xerces-J in the form of xsdval.class in the 285 current directory. 286 287 The validator itself is a java class xsdval.class built by 288 ../schemata/makeValidator.py. If you have java installed, calling 289 that in the schemata directory should just work (TM). With that 290 validator and the schemata in place, no network connection should 291 be necessary to run validation tests. 292 """
293 - def assertValidates(self, xmlSource, leaveOffending=False):
294 xercMsgs = getXSDErrors(xmlSource, leaveOffending) 295 if xercMsgs: 296 raise AssertionError(xercMsgs)
297
298 - def assertWellformed(self, xmlSource):
299 try: 300 etree.fromstring(xmlSource) 301 except Exception, msg: 302 raise AssertionError("XML not well-formed (%s)"%msg)
303
304 305 -def getMemDiffer(ofClass=base.Structure):
306 """returns a function to call that returns a list of new DaCHS structures 307 since this was called. 308 309 If you watch everything, things get hairy because of course the state 310 of this function (for instance) also creates references. Hence, pass 311 ofClass to choose what the funtion will track. 312 313 This will call a gc.collect itself (and wouldn't make sense without that) 314 """ 315 import gc 316 gc.collect() 317 seen_ids = set(id(ob) for ob in gc.get_objects() 318 if isinstance(ob, ofClass)) 319 seen_ids.add(id(seen_ids)) 320 321 def getNewObjects(): 322 newObjects = [] 323 for ob in gc.get_objects(): 324 if id(ob) not in seen_ids and isinstance(ob, ofClass): 325 newObjects.append(ob) 326 return newObjects
327 328 return getNewObjects 329
330 331 -def getUnreferenced(items):
332 """returns a list of elements in items that do not have a reference 333 from any other in items. 334 """ 335 import gc 336 itemids = set(id(i) for i in items) 337 unreferenced = [] 338 for i in items: 339 intrefs = set(id(r) for r in gc.get_referrers(i)) & itemids 340 if not intrefs: 341 unreferenced.append(i) 342 return unreferenced
343
344 345 -def debugReferenceChain(ob):
346 """a sort-of-interactive way to investigate where ob is referenced. 347 """ 348 import gc 349 while True: 350 print(repr(ob)) 351 refs = gc.get_referrers(ob) 352 while refs: 353 nob = refs.pop() 354 print(len(refs), utils.makeEllipsis(repr(nob))) 355 res = raw_input() 356 357 if res=="h": 358 print("d,x,<empty>") 359 360 elif res=="d": 361 import pdb;pdb.set_trace() 362 363 elif res=="x": 364 return 365 366 elif res: 367 ob = nob 368 break
369 370 NEWIDS = set() 371 372 from twisted.web.http import HTTPChannel
373 374 -def memdebug():
375 """a debug method to track memory usage after some code has run. 376 377 This is typically run from ArchiveService.locateChild, since request 378 processing should be idempotent wrt memory after initial caching. 379 380 This is for editing in place by DaCHS plumbers; accordingly, you're 381 not supposed to make sense of this. 382 """ 383 import gc 384 print(">>>>>> total managed:", len(gc.get_objects())) 385 386 if hasattr(base, "getNewStructs"): 387 ns = base.getNewStructs() 388 print(">>>>>> new objects:", len(ns)) 389 if len(ns)<110: 390 ur = getUnreferenced(ns) 391 print(">>>>>> new externally referenced:", len(ur)) 392 del ur 393 print(set(getattr(getattr(ob, "__class__", None), "__name__", "whatsit") for ob in ns)) 394 for ob in ns: 395 if getattr(getattr(ob, "__class__", None), "__name__", "whatsit" 396 ).endswith("cellxx"): 397 debugReferenceChain(ob) 398 break 399 400 base.getNewStructs = getMemDiffer(ofClass=HTTPChannel)
401
402 403 @contextlib.contextmanager 404 -def testFile(name, content, writeGz=False, inDir=base.getConfig("tempDir")):
405 """a context manager that creates a file name with content in inDir. 406 407 The full path name is returned. 408 409 With writeGz=True, content is gzipped on the fly (don't do this if 410 the data already is gzipped). 411 412 You can pass in name=None to get a temporary file name if you don't care 413 about the name. 414 415 inDir will be created as a side effect if it doesn't exist but (right 416 now, at least), not be removed. 417 """ 418 if not os.path.isdir(inDir): 419 os.makedirs(inDir) 420 421 if name is None: 422 handle, destName = tempfile.mkstemp(dir=inDir) 423 os.close(handle) 424 else: 425 destName = os.path.join(inDir, name) 426 427 if writeGz: 428 f = gzip.GzipFile(destName, mode="wb") 429 else: 430 f = open(destName, "w") 431 432 f.write(content) 433 f.close() 434 try: 435 yield destName 436 finally: 437 try: 438 os.unlink(destName) 439 except os.error: 440 pass
441
442 443 444 @contextlib.contextmanager 445 -def collectedEvents(*kinds):
446 """a context manager collecting event arguments for a while. 447 448 The yielded thing is a list that contains tuples of event name and 449 the event arguments. 450 """ 451 collected = [] 452 453 def makeHandler(evType): 454 def handler(*args): 455 collected.append((evType,)+args)
456 return handler 457 458 handlers = [(kind, makeHandler(kind)) for kind in kinds] 459 for kind, handler in handlers: 460 base.ui.subscribe(kind, handler) 461 462 try: 463 yield collected 464 finally: 465 for kind, handler in handlers: 466 base.ui.unsubscribe(kind, handler) 467