1 """
2 Helpers for trial-based tests, in particular retrieving pages.
3 """
4
5
6
7
8
9
10
11 import os
12 import urlparse
13 import warnings
14 import weakref
15 from cStringIO import StringIO
16
17 from nevow import context
18 from nevow import inevow
19 from nevow import util
20 from nevow import testutil
21 from nevow import url
22 from twisted.trial.unittest import TestCase as TrialTest
23 from twisted.python import failure
24 from twisted.internet import defer
25 from twisted.web.http_headers import Headers
26
27 from gavo.helpers import testhelpers
28 from gavo.helpers import testtricks
29
30 from gavo import base
31 from gavo import rsc
32 from gavo import utils
33
34 base.setConfig("web", "enabletests", "True")
35 from gavo.web import weberrors
36 from gavo.web import root
52
55 return _renderException(flr, ctx)
56
61
64 request = inevow.IRequest(ctx)
65 if not hasattr(page, "renderHTTP"):
66 return _requestDone(page, request, ctx)
67
68 d = util.maybeDeferred(page.renderHTTP,
69 context.PageContext(
70 tag=page, parent=context.RequestContext(tag=request)))
71
72 d.addCallback(_requestDone, request, ctx)
73 if formatExcs:
74 d.addErrback(_renderCrashAndBurn, ctx)
75 return d
76
79 page, segments = res
80 if segments:
81 return util.maybeDeferred(page.locateChild,
82 ctx, segments
83 ).addCallback(_deferredRender, ctx
84 ).addErrback(_renderException, ctx)
85
86 else:
87 try:
88 return _doRender(page, ctx)
89 except:
90 assert False, "Exceptions should not escape from _doRender"
91
94 filename = None
97
99 return iter(self.args)
100
102 return self.args[key][0]
103
105 return self.args[key][0]
106
108 return self.args.keys()
109
110 @property
112 return StringIO(self.args[0])
113
116 """A helper for simulating the old request.headers attribute in
117 twisted > jessie
118 """
121
123 try:
124 return self.request.responseHeaders.getRawHeaders(name)[0]
125 except IndexError:
126 raise KeyError(name)
127
129 return self.request.responseHeaders.hasHeader(name)
130
131
132 -class FakeRequest(testutil.AccumulatingFakeRequest):
133 """A Request for testing purpuses.
134
135 We have a version of our own for this since nevow's has a
136 registerProducer that produces an endless loop with push
137 producers (which is what we have).
138 """
140 self.finishDeferred = defer.Deferred()
141 testutil.AccumulatingFakeRequest.__init__(self, *args, **kwargs)
142 self.headers_out = _HeaderFaker(self)
143
144 if not hasattr(self, "responseHeaders"):
145 self.responseHeaders = Headers()
146 def _(key, val):
147 self.responseHeaders.setRawHeaders(key, [val])
148 testutil.AccumulatingFakeRequest.setHeader(self, key, val)
149 self.setHeader = _
150
152 self.channel = 1
153 self.producer = producer
154 if not isPush:
155 testutil.AccumulatingFakeRequest.registerProducer(
156 self, producer, isPush)
157
159 del self.channel
160 del self.producer
161
163 return self.finishDeferred
164
167 args = {}
168 for k, v in rawArgs.iteritems():
169 if isinstance(v, list):
170 args[k] = v
171 else:
172 args[k] = [v]
173 if path.startswith("http://"):
174 path = urlparse.urlparse(path).path
175 req = requestClass(uri="/"+path, args=args)
176
177 req.fields = FakeFieldStorage(args)
178 req.headers = {}
179 req.method = method
180 return req
181
182
183 -def getRequestContext(path, method="GET", args=None,
184 requestMogrifier=None, requestClass=FakeRequest):
185 if args is None:
186 args = {}
187 req = _buildRequest(method, "http://localhost"+path, args,
188 requestClass=requestClass)
189 if requestMogrifier is not None:
190 requestMogrifier(req)
191 ctx = context.WovenContext()
192 ctx.remember(req)
193 return ctx
194
195
196 -def runQuery(page, method, path, args, requestMogrifier=None,
197 requestClass=FakeRequest):
198 """runs a query on a page.
199
200 The query should look like it's coming from localhost.
201
202 The thing returns a deferred firing a pair of the result (a string)
203 and the request (from which you can glean headers and such).
204 """
205 ctx = getRequestContext(path, method, args, requestMogrifier,
206 requestClass=requestClass)
207 segments = tuple(path.split("/"))[1:]
208 return util.maybeDeferred(
209 page.locateChild, ctx, segments
210 ).addCallback(_deferredRender, ctx)
211
214 """a base class for tests of twisted web resources.
215 """
216 renderer = None
217
218 - def assertStringsIn(self, result, strings, inverse=False,
219 customTest=None):
220 content = result[0]
221 try:
222 for s in strings:
223 if inverse:
224 self.failIf(s in content, "'%s' in remote.data"%s)
225 else:
226 self.failIf(s not in content, "'%s' not in remote.data"%s)
227
228 if customTest is not None:
229 customTest(content)
230 except AssertionError:
231 with open("remote.data", "w") as f:
232 f.write(content)
233 raise
234 return result
235
241
246
250
251 - def assertPOSTHasStrings(self, path, args, strings, rm=None):
252 return self.assertResultHasStrings("POST", path, args, strings,
253 rm)
254
256 def check(res):
257 self.assertEqual(res[1].code, status)
258 return res
259 return runQuery(self.renderer, "GET", path, args).addCallback(
260 check)
261
263 def cb(res):
264 raise AssertionError("%s not raised (returned %s instead)"%(
265 exc, res))
266 def eb(flr):
267 flr.trap(exc)
268 if alsoCheck is not None:
269 alsoCheck(flr)
270
271 return runQuery(self.renderer, "GET", path, args
272 ).addCallback(cb
273 ).addErrback(eb)
274
281
285
289
295
298 """makes ddId from rdName and returns a cleanup function.
299
300 This is for creating temporary data for tests; it's supposed to be used
301 as in::
302
303 atexit.register(provideRDData("test", "import_fitsprod"))
304
305 This keeps track of (rdName, ddId) pairs it's already loaded and
306 doesn't import them again.
307 """
308 if (rdName, ddId) in _imported:
309 return lambda: None
310
311 dd = testhelpers.getTestRD(rdName).getById(ddId)
312 conn = getImportConnection()
313 dataCreated = rsc.makeData(dd, connection=conn)
314 conn.commit()
315 _imported.add((rdName, ddId))
316
317
318 nvArg = rsc.parseNonValidating
319
320 def cleanup():
321 dataCreated.dropTables(nvArg)
322
323 return cleanup
324
325
326 if os.environ.get("GAVO_LOG")!="no":
327 from gavo.user import logui
328 logui.LoggingUI(base.ui)
329