Source code for gavo.web.webtests

"""
A special renderer for testish things requiring the full server to be up
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from twisted.internet import reactor
from twisted.web import resource
from twisted.web import template
from twisted.web.template import tags as T

from gavo import base
from gavo.formal import nevowc
from gavo.svcs import streaming
from gavo.web import common


[docs]class FooPage(resource.Resource): """is the most basic page conceivable. """ loader = common.doctypedStan(T.html[ T.head[ T.title["A page"], ], T.body[ T.p["If you see this, you had better know why."]]])
[docs]class StreamerPage(resource.Resource): """is a page that delivers senseless but possibly huge streams of data through streaming.py """
[docs] def render(self, request): dataSize = int(request.strargs.get("size", [300])[0]) chunkSize = int(request.strargs.get("chunksize", [1000])[0]) def writeNoise(f): for i in range(dataSize//chunkSize): f.write(b"x"*chunkSize) lastPayload = b"1234567890" toSend = dataSize%chunkSize f.write(lastPayload*(toSend//10)+lastPayload[:toSend%10]) return streaming.streamOut(writeNoise, request)
[docs]class Delay(resource.Resource): """A page returning some data after some time has passed. """
[docs] def render(self, request): from twisted.internet import task delay = int(request.strargs.get("seconds", [10])[0]) request.setHeader("content-type", "text/plain") return task.deferLater(reactor, delay, lambda: b"Hello world\n")
[docs]class Block(resource.Resource): """A page blocking the entire server, including signals, for a while. """
[docs] def render(self, request): import os request.setHeader("content-type", "text/plain") os.system("sleep 20") return b"Living again\n"
[docs]class StreamerCrashPage(resource.Resource): """is a page that starts streaming out data and then crashes. """
[docs] def render(self, request): request.setHeader("content-type", "text/plain") def writeNoise(f): f.buffer.chunkSize = 30 f.write("Here is some data. (and some more, just to cause a flush)\n") raise Exception return streaming.streamOut(writeNoise, request)
[docs]class ExitPage(resource.Resource): """A page that lets the server exit (useful for profiling). """
[docs] def render(self, request): request.setHeader("content-type", "text/plain") reactor.callLater(1, lambda: reactor.stop()) return b"The server is now exiting in 1 second."
[docs]class RenderCrashPage(resource.Resource): """A page that crashes during render. """
[docs] @template.renderer def crash(self, ctx, data): try: raise Exception("Wanton crash") except: import traceback traceback.print_exc() raise
loader = common.doctypedStan(T.html[ T.head[ T.title["A page"], ], T.body[ T.p(render="crash")["You should not see this"]]])
[docs]class BadGatewayPage(resource.Resource): """A page that returns a 502 error. """
[docs] def render(self, request): request.setResponseCode(502, message="Bad Gateway") request.setHeader("content-type", "text/plain") return b"Bad Gateway"
[docs]class ServiceUnloadPage(resource.Resource): """a page that clears the services RD. """
[docs] def render(self, request): request.setHeader("content-type", "text/plain") base.caches.clearForName("__system__/services") return b"Cleared the services RD"
class _UnrenderableException(Exception): def __str__(self): ddt #noflake: intended f'up.
[docs]class BlowUpErrorPage(resource.Resource): """a page that produces an unrenderable error when rendering. """
[docs] def render_GET(self, request): raise _UnrenderableException()
[docs]class Tests(nevowc.TemplatedPage): def __init__(self): resource.Resource.__init__(self) self.putChild(b"foo", FooPage()) self.putChild(b"stream", StreamerPage()) self.putChild(b"streamcrash", StreamerCrashPage()) self.putChild(b"rendercrash", RenderCrashPage()) self.putChild(b"badgateway", BadGatewayPage()) self.putChild(b"block", Block()) self.putChild(b"exit", ExitPage()) self.putChild(b"clearservice", ServiceUnloadPage()) self.putChild(b"delay", Delay()) self.putChild(b"blowuperror", BlowUpErrorPage()) loader = common.doctypedStan(T.html[ T.head[ T.title["Wrong way"], ], T.body[ T.p["There is nothing here. Trust me."]]])