Source code for gavo.web.common

"""
Common code for the nevow based interface.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import json
import re
from urllib import parse as urlparse

from twisted.python import failure
from twisted.web import resource
from twisted.web import server
from twisted.web import static
from twisted.web import template
from twisted.web.template import tags as T
from twisted.web import http

from gavo import base
from gavo import formal
from gavo import svcs
from gavo import utils
from gavo.base import meta
from gavo.protocols import creds


# monkeypatching static's mime magic
static.File.contentTypes[".ascii"] = "application/octet-stream"
static.File.contentTypes[".f"] = "text/x-fortran"
static.File.contentTypes[".vot"] = base.votableType
static.File.contentTypes[".rd"] = "application/x-gavo-descriptor+xml"
static.File.contentTypes[".f90"] = "text/x-fortran"
static.File.contentTypes[".skyglow"] = "text/plain"
static.File.contentTypes[".fitstable"] = "application/fits"
static.File.contentTypes[".fits"] = "image/fits"
# this one is for co-operation with ifpages
static.File.contentTypes[".shtml"] = "text/nevow-template"


[docs]def escapeForHTML(aString): if isinstance(aString, bytes): aString = aString.decode("utf-8") return aString.replace("&", "&amp;" ).replace("<", "&lt;" ).replace(">", "&gt;")
[docs]def getfirst(request, key, default): """returns the first value of key in the arguments of a twisted request. """ return utils.getfirst(request.strargs, key, default)
GZIP_ENCODER = server.GzipEncoderFactory() # we don't want to waste a lot of CPU on one-shot 10% compression gains. GZIP_ENCODER.compressLevel = 3
[docs]def compwrap(pg): """wraps some twisted resource for compression. """ return resource.EncodingResourceWrapper(pg, [GZIP_ENCODER])
[docs]class HTMLMetaBuilder(meta.MetaBuilder): def __init__(self, macroPackage=None): meta.MetaBuilder.__init__(self) self.resultTree, self.currentAtom = [[]], None self.macroPackage = macroPackage
[docs] def startKey(self, atom): self.resultTree.append([])
[docs] def enterValue(self, value): val = value.getContent("html", self.macroPackage) if val: if not hasattr(val, "render"): # it's hopefully something sufficiently string-like self.resultTree[-1].append(T.xml(val)) else: self.resultTree[-1].append(val) # for meta items rendering their children themselves (which return # in IncludesChildren object), do not fold in rendered children # (the None sentinel is handled in endKey) if isinstance(val, meta.IncludesChildren): self.resultTree[-1].append(None)
[docs] def endKey(self, atom): children = [c for c in self.resultTree.pop() if c] # see enterValue on why we're doing this if (self.resultTree[-1] and self.resultTree[-1][-1] is None): return if len(children)>1: childElements = [] for c in children: childElements.append(T.li(class_="metaItem")[c]) self.resultTree[-1].append(T.ul(class_="metaEnum")[childElements]) elif len(children)==1: self.resultTree[-1].append(children[0])
[docs] def getResult(self): return self.resultTree[0]
[docs] def clear(self): self.resultTree = [[]]
[docs]def runAuthenticated(request, reqGroup, fun, *args): """returns the value of ``fun(*args)`` if the logged in user is in reqGroup, requests authentication otherwise. """ if creds.hasCredentials(request.getUser(), request.getPassword(), reqGroup): return fun(*args) else: raise svcs.Authenticate()
[docs]class doctypedStan(template.TagLoader): """is the TagLoader loader that arranges for a an XHTML doctype and namespace. As this relies on a magic attribute gavo_useDoctype, this will only work if the template will be rendered through nevowc.TemplatedPage. """ DOCTYPE = ('<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"' ' "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">') def __init__(self, rootEl): rootEl(xmlns="http://www.w3.org/1999/xhtml") rootEl.gavo_useDoctype = self.DOCTYPE template.TagLoader.__init__(self, rootEl)
def produceErrorDocument(failure, request): # pragma: no cover """writes some representation of failure to request. This is overwritten from weberrors in normal operation. """ request.setHeader("content-type", "text/plain") request.write("Yikes -- there's an error but no handler. Complain.\r\n") request.finish() JSEXT = ".js"
[docs]class CommonRenderers(object): """A base for renderer (python) mixins within the DC. Including standard stylesheets/js/whatever: <head n:render="commonhead">...</head> Rendering internal links (important for off-root operation): * <tag href|src="/foo" n:render="rootlink"/> """
[docs] @template.renderer def unicode(self, request, tag): """returns unicode(data); this is deprecated in favour of string. """ return tag[str(tag.slotData)]
[docs] @template.renderer def commonhead(self, request, tag): # we do not want to blindly append these things to the tag since user- # provided items in the header should have access to this environment, # in particular to jquery; thus, this stuff must be as early as possible. originalChildren = tag.children[:] tag.clear() res = tag[ T.meta(**{"http-equiv": "Content-type", "content": "text/html;charset=UTF-8"}), T.link(rel="stylesheet", href=base.makeSitePath("/static/css/formal.css"), type="text/css"), T.link(rel="stylesheet", href=base.makeSitePath( "/static/css/gavo_dc.css"), type="text/css"), T.script(src=base.makeSitePath("/static/js/jquery-gavo.js"), type="text/javascript"), T.script(src=base.makeSitePath("/static/js/gavo"+JSEXT), type="text/javascript"), originalChildren, ] if base.getConfig("web", "operatorCSS"): res[ T.link(rel="stylesheet", type="text/css", href=base.getConfig("web", "operatorCSS"))] return res
[docs] @template.renderer def urlescape(self, request, tag): """renders data as a url-escaped string. """ data = tag.slotData return urlparse.quote(data)
[docs] @template.renderer def getconfig(self, request, tag): """looks up the text child in the DaCHS configuration and inserts the value as a (unicode) string. The config key is either [section]item or just item for something in [general]. Behaviour for undefined config items is undefined. """ configKey = tag.children[0].strip() mat = re.match("\[([^]]*)\](.*)", configKey) if mat: content = base.getConfig(mat.group(1), mat.group(2)) else: content = base.getConfig(configKey) tag.clear() return tag[content]
[docs]class TypedData(resource.Resource): """A simple resource just producing bytes passed in during construction, declared as a special content type. """ def __init__(self, data, mime, modificationStamp=None): resource.Resource.__init__(self) self.data, self.mime = data, mime self.modificationStamp = modificationStamp
[docs] def render(self, request): request.setHeader("content-type", self.mime) if self.modificationStamp is not None: if request.setLastModified(self.modificationStamp)==http.CACHED: return b"" request.write(self.data) request.finish() return server.NOT_DONE_YET
[docs]class JSONQuery(resource.Resource): """A resource returning JSON for a database query. Have the query in a query attribute to the class. Use %(whatever)s to put in the first match of that in request.strargs. TODO: we should do some more sensible error handling. """
[docs] def doQuery(self, queryArgs): """returns a list of dictionaries for the query result. The default just executes self.query. """ with base.getTableConn() as conn: return list(conn.queryToDicts( self.query, queryArgs))
[docs] def render(self, request): queryArgs = dict((key, value[0]) for key, value in request.strargs.items()) res = self.doQuery(queryArgs) request.setHeader("content-type", "text/json") return json.dumps(res).encode("utf-8")
[docs]class BinaryItem: """A sentinel marking request.args items that couldn't be utf-8 decoded Whatever it was (most likely a file upload) is found in the original attribute. """ def __init__(self, original): self.original = original def __getnewargs_ex__(self): # we don't want file uploads in the pickled stuff that TAP and # UWS make of their parameters (it's a waste of CPU and space, as # the files already reside as plain files on disk). If this # ever fires, just make sure you're popping file uploads from # strargs before pickling. raise ValueError("You cannot (and should not) pickle BinaryItems") def __str__(self): return ("<A piece of data that is no utf-8 encoded string: %s>"% utils.makeEllipsis(repr(self.original)))
[docs]class Request(formal.Request): """a custom request class used in DaCHS' application server. Overridden behaviour: * you can write str; it's going to be utf-8 encoded automatically, which out to be about the right thing almost always in DaCHS. * There's a popSegments method for use during resource dispatch (it gets all remaining segments and pushes them to prepath at the same time). popSegments also returns (utf-8-decoded) strings rather than bytes. * There's strargs, which is a dict mapping strings to lists of strings. This is being created at first call can then doesn't follow request.args any more. DaCHS code should only use strargs unless you're actually dealing with non-utf-8 data (and then you should probably use request.files). """ def __init__(self, *args, **kwargs): server.Request.__init__(self, *args, **kwargs) self.files, self.args = {}, {}
[docs] def gotLength(self, length): server.Request.gotLength(self, length) # we'd like to distinguish between sync and async here, but # the request is largely unpopulated at this point. As far as I # can see, the only thing I can do is look at a piece of # private twisted API isSync = True try: path = self.channel._path.split(b"?")[0] isSync = path.endswith(b"/sync") except Exception as msg: base.ui.notifyError("In gotLength: %s"%msg) if isSync: self.maxUploadSize = base.getConfig("web", "maxSyncUploadSize") else: self.maxUploadSize = base.getConfig("web", "maxUploadSize") # The following is a shortcut to break out as soon as possible # when the uploading program declares a content length (though # by twisted's architecture we can't exit in this place anyway) if length and length>self.maxUploadSize: self.maxUploadSize = 0
[docs] def write(self, payload): if self.finished or getattr(self, "channel", None) is None: # request processing should really have stopped when channel # has gone away. But regrettably there are code paths when # that doesn't happen (in particular, we're a bit lazy when # reporting errors). Let's not spew scary traceback # to the log in such cases. if not hasattr(self, "dead_warning_given"): base.ui.notifyWarning("Ignoring attempt to write" " to a dead connection") self.dead_warning_given = True else: server.Request.write(self, utils.bytify(payload))
# def finish(self): # import pdb;pdb.Pdb(nosigint=True).set_trace() # server.Request.finish(self)
[docs] def connectionLost(self, reason): # standard twisted (at least as of 17.5) doesn't log these, so # we try do it manually. # While we're at it, we're setting client to None, which thus # can be used as a clear indicator that the connection won't # work any more. # helper to still let us log when request is damaged. if not hasattr(self, "client"): self.client = None self.code = 400 # If the request couldn't be parsed, things didn't work out well, # and we shouldn't pretend they did (but request.code defaults to 200) if self.method=='(no method yet)': self.code = 400 try: self.channel.factory.log(self) except Exception as exc: base.ui.notifyError( "Request with lost connection wasn't logged: %s"%exc) try: return server.Request.connectionLost(self, reason) finally: self.client = None
[docs] def popSegments(self, name): """returns [name]+postpath and clears the current postpath, where all elements are utf-8-decoded strings. This facilitates nevow-style full-path resource resolution. """ if name is None: res = self.postpath else: res = [utils.bytify(name)]+self.postpath self.prepath.extend(self.postpath) self.postpath = [] return [s.decode("utf-8", "replace") for s in res]
[docs] def pushBackSegments(self, segments): """tries to put segments back to the prepath from the postpath. This will remove existing segments from postpath as long as they match segments. If you're devious, you might exhaust the prepath in this way; that's your fault, then. segments may contain strings or bytes and will be destroyed in the process. """ postSegments = [] for seg in reversed(segments): seg = utils.bytify(seg) if seg==self.prepath[-1]: self.prepath.pop() postSegments.append(seg) self.postpath = list(reversed(postSegments))
[docs] def getUser(self): """returns a username as a string -- or None if no user info has been passed. We're overriding this because we really don't want to get any bytes in here. If anyone manages to put in non-utf, this will return None. Note that getUser does *not* authenticate; it just returns whatever twisted deems to be the user name. Use getAuthUser for a user name validated through some credential. """ try: user = utils.debytify(server.Request.getUser(self)) except UnicodeDecodeError: return None return user or None
[docs] def getPassword(self): """returns the password utf-8-decoded. See getUser on the rationale and on limitations. """ try: return utils.debytify(server.Request.getPassword(self)) except UnicodeDecodeError: return None
[docs] def getAuthUser(self): """returns a username backed by credentials as a string. This will return None if either no user was passed in or if the credentials are wrong. This is for optional-auth scenarios. For anything else, use the runAuthenticated function from this module. """ user = self.getUser() if (user and creds.hasCredentials(user, self.getPassword(), None)): return user return None
def _makeStrargs(self): """returns self.args decoded into strings. This assumes that everything is in utf-8. For keys, we fail if that's not true (actually, we should fail if it's not ASCII, if you ask me). For values, there's a sentinel BinaryItem class in the lists if they don't utf-8 decode. Versus python3 cgi, this is different in that we don't dodge non-utf-8 parameters with surrogates but instead pack them into BinaryItems. Use the strargs property to get this. """ res = utils.CaseSemisensitiveDict() for key, values in self.args.items(): res[key.decode("latin-1")] = mapped = [] for value in values: try: if isinstance(value, str): mapped.append(value) elif isinstance(value, bytes): mapped.append(value.decode("utf-8")) except (UnicodeDecodeError, AttributeError): # binary data or non-string (uploads, most likely). This is # mainly in here so contextgrammar can parse from a single dict. mapped.append(BinaryItem(value)) return res @property def strargs(self): if not hasattr(self, "_strargs"): self._strargs = self._makeStrargs() return self._strargs
[docs] def finishCallback(self, perhapsFailure): """A convenience method that finishes the request and perhaps handles and error This is mainly here as a shortcut to .addBoth(lambda _: request.finish()); It also is a last-resort check that catches escaping, unhandled errors; but, at least for now, you shouldn't rely on getting sensible behaviour (e.g., error reporting to users) from here. """ if isinstance(perhapsFailure, failure.Failure): base.ui.notifyFailure( perhapsFailure, "A failure escaped to finishCallback. This probably means no" " error was reported to the requester. ") self.finish()
[docs] def processingFailed(self, failure): return produceErrorDocument(failure, self)