Source code for gavo.web.root

"""
The root resource of the data center.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import os
import re
import time

from twisted.python import threadable
threadable.init()

from twisted.web import resource
from twisted.web import server
from twisted.web import static
from twisted.web.template import tags as T

from gavo import base
from gavo import svcs
from gavo import utils
from gavo.formal import nevowc
from gavo.web import caching
from gavo.web import common
from gavo.web import grend
from gavo.web import ifpages
from gavo.web import metarender
from gavo.web import weberrors

from gavo.svcs import (UnknownURI, WebRedirect)

def _escape(s):
	"""helps formatDefaultLog.
	"""
	if isinstance(s, bytes):
		s = s.decode('ascii', 'ignore')
	return "'%s'"%(s.replace('"', '\\"'))


[docs]def formatDefaultLog(timestamp, request): """returns a log line for request in DaCHS' default format. It doesn't include IP addresses, referrers or user agents, which means you should be fine as far as processing personal data is concerned. The format itself should be compatible with "combined" logs. """ line = ( '%(ip)s - - %(timestamp)s "%(method)s %(uri)s %(protocol)s" ' '%(code)d %(length)s "%(referrer)s" "%(agent)s"' % dict( ip="-", timestamp=timestamp, method=_escape(request.method), uri=_escape(request.uri), protocol=_escape(request.clientproto), code=request.code, length=request.sentLength or "-", referrer="-", agent="-", )) return line
[docs]def getLogFormatter(): """returns a log formatter for this site. Right now, this just interprets [web]logger. """ logFormat = base.getConfig("web", "logformat") if logFormat=="default": return formatDefaultLog elif logFormat=="combined": from twisted.web import http return http.combinedLogFormatter else: # pragma: no cover raise NotImplementedError("No logger %s"%logFormat)
[docs]@functools.lru_cache(1) def makeFaviconPNG(): """returns a "small" version of the logo. This is used mainly for SAMP logos at them moment. """ from gavo.utils import imgtools imgPath = os.path.join( base.getConfig("webDir"), "nv_static", "logo_medium.png") if not os.path.exists(imgPath): imgPath = base.getPathForDistFile("web/img/logo_medium.png") return static.Data( imgtools.getScaledPNG(imgPath, 62), type="image/png")
def _authorizeCORS(request): """adds cross-origin authorisation headers if appropriate. This evaluates the [web]corsOrigins config item. """ origin = request.getHeader("Origin") pat = base.getConfig("web", "corsoriginpat") if pat and origin and re.match(pat, origin): request.setHeader("Access-Control-Allow-Origin", origin) class _InsecureRequestUpgrader(resource.Resource): def render(self, request): request.setHeader("vary", "upgrade-insecure-requests") request.setHeader("location", base.getHTTPSBase( ).rstrip("/").encode("ascii")+b"/"+request.uri.lstrip(b"/")) request.setHeader("content-type", "text/plain") request.setResponseCode(307) request.write(b"Your browser asked to be redirected to an https version\r\n" b" of this page. That's what I'm trying to do.\r\n") request.finish() return server.NOT_DONE_YET _UPGRADER = _InsecureRequestUpgrader() def _upgradeInsecureRequests(request): """sends a redirect to an https version of what requests asks for if that's possible and makes sense. If function deems it should redirect, it will return a resource that arranges for that. Otherwise, it returns None and the request needs to processed in the normal way. """ if (not base.LISTENING_TO_HTTPS or base.getConfig("web", "ignore-uir-header")): return if request.method!=b"GET": # in particular, never upgrade POST, as that doesn't work through redirects return if not base.getConfig("web", "adaptProtocol"): return if request.isSecure(): # broken client? reverse proxying through https? whatever it # is, don't bother return if request.getHeader("upgrade-insecure-requests")!="1": return return _UPGRADER
[docs]class ArchiveService(nevowc.TemplatedPage): """The root resource on the data center. It does the main dispatching based on four mechanisms: 0. redirects -- one-segments fragments that redirect somewhere else. This is for "bad" shortcuts corresponding to input directory name exclusively (since it's so messy). These will not match if path has more than one segment. 1. statics -- first segment leads to a resource that gets passed any additional segments. 2. mappings -- first segment is replaced by something else, processing continues. 3. resource based -- consisting of an RD id, a service id, a renderer and possibly further segments. The first three mechanisms only look at the first segment to determine any action (except that redirect is skipped if len(segments)>1). The statics and mappings are configured on the class level. """ timestampStarted = time.time() statics = {} mappings = {} redirects = {} def __init__(self): nevowc.TemplatedPage.__init__(self) self.maintFile = os.path.join(base.getConfig("stateDir"), "MAINT") self.rootSegments = tuple(s for s in base.getConfig("web", "nevowRoot").split("/") if s) self.rootLen = len(self.rootSegments)
[docs] @classmethod def addRedirect(cls, key, destination): cls.redirects[key.strip("/")] = destination
[docs] @classmethod def addStatic(cls, key, resource): cls.statics[key] = resource
[docs] @classmethod def addMapping(cls, key, segments): cls.mappings[key] = segments
@classmethod def _addVanityRedirect(cls, src, dest, options): """a helper for parseVanityMap. """ if '!redirect' in options: if "://" in dest: cls.addRedirect(src, dest) else: cls.addRedirect(src, base.makeSitePath(dest)) else: cls.addMapping(src, dest.split("/"))
[docs] @classmethod def installVanityMap(cls): """builds the redirects prescribed by the system-wide vanity map. """ for src, (dest, options) in svcs.getVanityMap().shortToLong.items(): cls._addVanityRedirect(src, dest, options)
[docs] def render(self, request): # this is only ever executed on the root URL. For consistency # (e.g., caching), we route this through getChild though # we know we're going to return RootPage. getChild must # thus *never* return self return self.getChild(None, request).render(request)
def _locateResourceBasedChild(self, request, segments): """returns a standard, resource-based service renderer and any unconsumed segments. Their URIs look like <rd id>/<service id>{/<anything>}. This works by successively trying to use parts of the query path of increasing length as RD ids. If one matches, the next segment is the service id, and the following one the renderer. The remaining segments are returned unconsumed. If no RD matches, an UnknwownURI exception is raised. """ for srvInd in range(1, len(segments)): try: rd = base.caches.getRD("/".join(segments[:srvInd])) except base.RDNotFound: continue else: break else: raise UnknownURI("No matching RD") try: subId, rendName = segments[srvInd], segments[srvInd+1] except IndexError: # a URL requesting a default renderer subId, rendName = segments[srvInd], None service = rd.getService(subId) if not service: if rd.hasMeta("superseded"): return weberrors.NotFoundPageWithFancyMessage([ "This resource is stale and has been superseded.", T.div(class_="superseded-message")[ T.xml(rd.getMeta("superseded").getContent("html"))] ]), [] raise UnknownURI("No such service: %s"%subId, rd=rd) if not rendName: rendName = service.defaultRenderer if rendName is None: raise UnknownURI("No renderer given and service has no default") try: rendC = svcs.getRenderer(rendName) except Exception as exc: exc.rd = rd raise cached = caching.getFromServiceCache( request, service, rendC, segments) if cached: return cached, [] else: return rendC(request, service), [ utils.bytify(s) for s in segments[srvInd+2:]]
[docs] def getChild(self, name, request): segments = request.popSegments(name) if False: from gavo.helpers import testtricks testtricks.memdebug(common.Request) request.setHeader(b"server", base.SERVER_SOFTWARE) if request.requestHeaders.hasHeader("x-forwarded-host"): # we need the externally visible host in our request even # when we're behind a reverse proxy. This is a dumb attempt # at fixing such situations. request.setHost(request.requestHeaders.getRawHeaders( "x-forwarded-host")[0], 80) if request.requestHeaders.hasHeader("origin"): _authorizeCORS(request) if request.requestHeaders.hasHeader("upgrade-insecure-requests"): upgrader = _upgradeInsecureRequests(request) if upgrader: return upgrader if ((segments and segments[0]!='static') and os.path.exists(self.maintFile)): return ifpages.ServiceUnavailable() if self.rootSegments: if segments[:self.rootLen]!=self.rootSegments: raise UnknownURI("Misconfiguration: Saw a URL outside of the server's" " scope") segments = segments[self.rootLen:] curPath = "/".join(segments) # allow // to stand for __system__ like in RDs if curPath.startswith ("/"): segments = ["__system__"]+segments[1:] curPath = "/".join(segments).strip("/") curPath = curPath.strip("/") if curPath=="": segments = base.getConfig("web", "root").split("/") if curPath in self.redirects: raise WebRedirect(self.redirects[curPath]) # re-assign name so it's the mogrified string now name = segments[0] if name in self.statics: request.pushBackSegments(segments[1:]) return self.statics[segments[0]] if name in self.mappings: segments[:1] = self.mappings[name] try: res, postpath = self._locateResourceBasedChild(request, segments) request.pushBackSegments(postpath) return common.compwrap(res) except grend.RDBlocked: return ifpages.TemplatedPage( request, svcs.loadSystemTemplate("blocked.html"))
ArchiveService.addStatic("login", ifpages.LoginPage()) ArchiveService.addStatic("static", ifpages.StaticServer()) ArchiveService.addStatic("schemata", ifpages.SchemaServer()) ArchiveService.addStatic("robots.txt", ifpages.RobotsTxt()) ArchiveService.addStatic("clientcount", ifpages.CurReaders()) ArchiveService.addStatic("teapot", ifpages.Teapot()) ArchiveService.addStatic("3rdparty", ifpages.ThirdPartyCachePage()) # make these self-registering? Or write them out somewhere? ArchiveService.addStatic("getRR", metarender.ResourceRecordMaker()) # .well-known right now is only used by ACME ArchiveService.addStatic(".well-known", ifpages.WellKnown()) ArchiveService.addStatic("fancyroot", ifpages.ROOT_PAGE_HELPERS) if base.getConfig("web", "enabletests"): from gavo.web import webtests ArchiveService.addStatic("test", webtests.Tests()) ArchiveService.addStatic("favicon.png", makeFaviconPNG()) if (base.getConfig("web", "favicon") and os.path.exists(base.getConfig("web", "favicon"))): ArchiveService.addStatic("favicon.ico", static.File(base.getConfig("web", "favicon"))) ArchiveService.installVanityMap() root = ArchiveService() site = server.Site(root, timeout=300, logFormatter=getLogFormatter()) site.requestFactory = common.Request