1 """
2 Helper functions and classes for unit tests and similar.
3
4 Whatever is useful to unit tests from here should be imported into
5 testhelpers, too. Unit test modules should not be forced to import
6 this.
7 """
8
9
10
11
12
13
14
15 from __future__ import print_function
16
17 import contextlib
18 import gzip
19 import os
20 import re
21 import subprocess
22 import tempfile
23 import unittest
24
25 from lxml import etree
26
27 from gavo import base
28 from gavo import utils
29 from gavo.utils import stanxml
33 nsCleaner = re.compile('^(</?)(?:[a-z0-9]+:)')
34 return re.sub("(?s)<[^>]*>",
35 lambda mat: nsCleaner.sub(r"\1", mat.group()),
36 re.sub('xmlns="[^"]*"', "", xmlString))
37
40 """returns an ``libxml2`` etree for ``xmlString``, where, for convenience,
41 all namespaces on elements are nuked.
42
43 The libxml2 etree lets you do xpath searching using the ``xpath`` method.
44
45 Nuking namespaces is of course not a good idea in general, so you
46 might want to think again before you use this in production code.
47 """
48 tree = etree.fromstring(_nukeNamespaces(xmlString))
49
50 if debug:
51 etree.dump(tree)
52 return tree
53
56 """returns Xerces error messages for XSD validation of data, or None
57 if data is valid.
58
59 See the docstring of XSDTestMixin for how to make this work.
60
61 This raises a unittest.SkipTest exception if the validator cannot be
62 found.
63 """
64
65 validatorDir = base.getConfig("cacheDir")
66 if not os.path.exists(os.path.join(validatorDir, "xsdval.class")):
67 raise unittest.SkipTest("java XSD valdiator not found -- run"
68 " schemata/makeValidator.py")
69
70 classpath = ":".join([validatorDir]+base.getConfig("xsdclasspath"))
71 handle, inName = tempfile.mkstemp("xerctest", "rm")
72 try:
73 with os.fdopen(handle, "w") as f:
74 f.write(data)
75 args = ["java", "-cp", classpath, "xsdval",
76 "-n", "-v", "-s", "-f", inName]
77
78 f = subprocess.Popen(args,
79 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
80 xercMsgs = f.stdout.read()
81 status = f.wait()
82 if status or "Error]" in xercMsgs:
83 if leaveOffending:
84 with open("badDocument.xml", "w") as of:
85 of.write(data)
86 return xercMsgs
87 finally:
88 os.unlink(inName)
89 return None
90
93 """A resolver for external entities only returning in-tree files.
94 """
96 self.basePath = "schemata"
97
102
103 - def resolve(self, url, pubid, context):
104 try:
105
106 try:
107 url = stanxml.NSRegistry.getSchemaForNS(url)
108 except base.NotFoundError:
109
110 pass
111
112 path = self.getPathForName(url)
113 res = self.resolve_filename(path, context)
114 if res is not None:
115 return res
116 except:
117 pass
118 base.ui.notifyError("Did not find local file for schema %s --"
119 " this will fall back to network resources and thus probably"
120 " be slow"%url)
121
122
123 RESOLVER = XSDResolver()
124 XSD_PARSER = etree.XMLParser()
125 XSD_PARSER.resolvers.add(RESOLVER)
126
127
128 @contextlib.contextmanager
129 -def MyParser():
130 if etree.get_default_parser is XSD_PARSER:
131 yield
132 else:
133 etree.set_default_parser(XSD_PARSER)
134 try:
135 yield
136 finally:
137 etree.set_default_parser()
138
140 """A hack that generates QNames through getattr.
141
142 Construct with the desired namespace.
143 """
146
148 return etree.QName(self.ns, name.strip("_"))
149
150 XS = QNamer("http://www.w3.org/2001/XMLSchema")
151
152
153 VO_SCHEMATA = [
154 "Characterisation-v1.11.xsd",
155 "ConeSearch-v1.1.xsd",
156 "DataModel-v1.0.xsd",
157 "DocRegExt-v1.0.xsd",
158 "oai_dc.xsd",
159 "OAI-PMH.xsd",
160 "RegistryInterface-v1.0.xsd",
161 "SIA-v1.2.xsd",
162 "SLAP-v1.1.xsd",
163 "SSA-v1.2.xsd",
164 "StandardsRegExt-1.0.xsd",
165 "stc-v1.30.xsd",
166 "stc-v1.20.xsd",
167 "coords-v1.20.xsd",
168 "region-v1.20.xsd",
169 "TAPRegExt-v1.0.xsd",
170 "UWS-v1.1.xsd",
171 "VODataService-v1.1.xsd",
172 "VOEvent-1.0.xsd",
173 "VORegistry-v1.0.xsd",
174 "VOResource-v1.1.xsd",
175 "VOSIAvailability-v1.0.xsd",
176 "VOSICapabilities-v1.0.xsd",
177 "VOSITables-v1.0.xsd",
178 "VOTable-1.1.xsd",
179 "VOTable-1.2.xsd",
180 "VOTable-1.3.xsd",
181 "vo-dml-v1.0.xsd",
182 "xlink.xsd",
183 "XMLSchema.xsd",
184 "xml.xsd",]
188 """returns an lxml validator containing the schemas in schemaPaths.
189
190 schemaPaths must be actual file paths, absolute or
191 trunk/schema-relative.
192 """
193 with MyParser():
194 subordinates = []
195 for fName in schemaPaths:
196 fPath = RESOLVER.getPathForName(fName)
197 root = etree.parse(fPath).getroot()
198 subordinates.append((
199 "http://vo.ari.uni-heidelberg.de/docs/schemata/"+fName,
200 root.get("targetNamespace")))
201
202 root = etree.Element(
203 XS.schema, attrib={"targetNamespace": "urn:combiner"})
204 for schemaLocation, tns in subordinates:
205 etree.SubElement(root, XS.import_, attrib={
206 "namespace": tns, "schemaLocation": schemaLocation})
207
208 doc = etree.ElementTree(root)
209 return etree.XMLSchema(doc)
210
213 """returns a validator that knows the schemata typically useful within
214 the VO.
215
216 This will currently only work if DaCHS is installed from an SVN
217 checkout with setup.py develop.
218
219 What's returned has a method assertValid(et) that raises an exception
220 if the elementtree et is not valid. You can simply call it to
221 get back True for valid and False for invalid.
222 """
223 return getJointValidator(VO_SCHEMATA+extraSchemata)
224
227 """returns an lxml-based schema validating function for the VO XSDs
228
229 This is not happening at import time as it is time-consuming, and the
230 DaCHS server probably doesn't even validate anything.
231
232 This is used below to build getXSDErrorsLXML.
233 """
234 VALIDATOR = getDefaultValidator()
235
236 def getErrors(data, leaveOffending=False):
237 """returns error messages for the XSD validation of the string in data.
238 """
239 try:
240 with MyParser():
241 if hasattr(data, "xpath"):
242
243 tree = data
244 else:
245 tree = etree.fromstring(data)
246
247 if VALIDATOR.validate(tree):
248 return None
249 else:
250 if leaveOffending:
251 if hasattr(data, "xpath"):
252 data = etree.tostring(data, encoding="utf-8")
253 with open("badDocument.xml", "w") as of:
254 of.write(data)
255 return str(VALIDATOR.error_log)
256 except Exception as msg:
257 return str(msg)
258
259 return getErrors
260
263 """returns error messages for the XSD validation of the string in data.
264
265 This is the lxml-based implemenation, much less disruptive than the
266 xerces-based one.
267 """
268 if not hasattr(getXSDErrorsLXML, "validate"):
269 getXSDErrorsLXML.validate = _makeLXMLValidator()
270 return getXSDErrorsLXML.validate(data, leaveOffending)
271
272
273 getXSDErrors = getXSDErrorsLXML
277 """provides a assertValidates method doing XSD validation.
278
279 assertValidates raises an assertion error with the validator's
280 messages on an error. You can optionally pass a leaveOffending
281 argument to make the method store the offending document in
282 badDocument.xml.
283
284 The whole thing needs Xerces-J in the form of xsdval.class in the
285 current directory.
286
287 The validator itself is a java class xsdval.class built by
288 ../schemata/makeValidator.py. If you have java installed, calling
289 that in the schemata directory should just work (TM). With that
290 validator and the schemata in place, no network connection should
291 be necessary to run validation tests.
292 """
294 xercMsgs = getXSDErrors(xmlSource, leaveOffending)
295 if xercMsgs:
296 raise AssertionError(xercMsgs)
297
303
306 """returns a function to call that returns a list of new DaCHS structures
307 since this was called.
308
309 If you watch everything, things get hairy because of course the state
310 of this function (for instance) also creates references. Hence, pass
311 ofClass to choose what the funtion will track.
312
313 This will call a gc.collect itself (and wouldn't make sense without that)
314 """
315 import gc
316 gc.collect()
317 seen_ids = set(id(ob) for ob in gc.get_objects()
318 if isinstance(ob, ofClass))
319 seen_ids.add(id(seen_ids))
320
321 def getNewObjects():
322 newObjects = []
323 for ob in gc.get_objects():
324 if id(ob) not in seen_ids and isinstance(ob, ofClass):
325 newObjects.append(ob)
326 return newObjects
327
328 return getNewObjects
329
332 """returns a list of elements in items that do not have a reference
333 from any other in items.
334 """
335 import gc
336 itemids = set(id(i) for i in items)
337 unreferenced = []
338 for i in items:
339 intrefs = set(id(r) for r in gc.get_referrers(i)) & itemids
340 if not intrefs:
341 unreferenced.append(i)
342 return unreferenced
343
346 """a sort-of-interactive way to investigate where ob is referenced.
347 """
348 import gc
349 while True:
350 print(repr(ob))
351 refs = gc.get_referrers(ob)
352 while refs:
353 nob = refs.pop()
354 print(len(refs), utils.makeEllipsis(repr(nob)))
355 res = raw_input()
356
357 if res=="h":
358 print("d,x,<empty>")
359
360 elif res=="d":
361 import pdb;pdb.set_trace()
362
363 elif res=="x":
364 return
365
366 elif res:
367 ob = nob
368 break
369
370 NEWIDS = set()
371
372 from twisted.web.http import HTTPChannel
375 """a debug method to track memory usage after some code has run.
376
377 This is typically run from ArchiveService.locateChild, since request
378 processing should be idempotent wrt memory after initial caching.
379
380 This is for editing in place by DaCHS plumbers; accordingly, you're
381 not supposed to make sense of this.
382 """
383 import gc
384 print(">>>>>> total managed:", len(gc.get_objects()))
385
386 if hasattr(base, "getNewStructs"):
387 ns = base.getNewStructs()
388 print(">>>>>> new objects:", len(ns))
389 if len(ns)<110:
390 ur = getUnreferenced(ns)
391 print(">>>>>> new externally referenced:", len(ur))
392 del ur
393 print(set(getattr(getattr(ob, "__class__", None), "__name__", "whatsit") for ob in ns))
394 for ob in ns:
395 if getattr(getattr(ob, "__class__", None), "__name__", "whatsit"
396 ).endswith("cellxx"):
397 debugReferenceChain(ob)
398 break
399
400 base.getNewStructs = getMemDiffer(ofClass=HTTPChannel)
401
402
403 @contextlib.contextmanager
404 -def testFile(name, content, writeGz=False, inDir=base.getConfig("tempDir")):
405 """a context manager that creates a file name with content in inDir.
406
407 The full path name is returned.
408
409 With writeGz=True, content is gzipped on the fly (don't do this if
410 the data already is gzipped).
411
412 You can pass in name=None to get a temporary file name if you don't care
413 about the name.
414
415 inDir will be created as a side effect if it doesn't exist but (right
416 now, at least), not be removed.
417 """
418 if not os.path.isdir(inDir):
419 os.makedirs(inDir)
420
421 if name is None:
422 handle, destName = tempfile.mkstemp(dir=inDir)
423 os.close(handle)
424 else:
425 destName = os.path.join(inDir, name)
426
427 if writeGz:
428 f = gzip.GzipFile(destName, mode="wb")
429 else:
430 f = open(destName, "w")
431
432 f.write(content)
433 f.close()
434 try:
435 yield destName
436 finally:
437 try:
438 os.unlink(destName)
439 except os.error:
440 pass
441
446 """a context manager collecting event arguments for a while.
447
448 The yielded thing is a list that contains tuples of event name and
449 the event arguments.
450 """
451 collected = []
452
453 def makeHandler(evType):
454 def handler(*args):
455 collected.append((evType,)+args)
456 return handler
457
458 handlers = [(kind, makeHandler(kind)) for kind in kinds]
459 for kind, handler in handlers:
460 base.ui.subscribe(kind, handler)
461
462 try:
463 yield collected
464 finally:
465 for kind, handler in handlers:
466 base.ui.unsubscribe(kind, handler)
467