Source code for gavo.svcs.common

"""
Common functions and classes for services and cores.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import re
import os

import pkg_resources

from gavo import base
from gavo import utils
from gavo.formal import nevowc


[docs]class Error(base.ExecutiveAction): def __init__(self, msg, rd=None, hint=None, htmlMessage=None): self.rd = rd self.msg = msg self.hint = hint if htmlMessage is not None: self.htmlMessage = htmlMessage base.ExecutiveAction.__init__(self, msg)
[docs]class BadMethod(Error): """raised to generate an HTTP 405 response. """ def __str__(self): return "This resource cannot respond to the HTTP '%s' method"%self.msg
[docs]class UnknownURI(Error, base.NotFoundError): """raised to generate an HTTP 404 response. """ def __str__(self): return Error.__str__(self)
[docs]class ForbiddenURI(Error): """raised to generate an HTTP 403 response. """
[docs]class RequestEntityTooLarge(Error): """raised to generate an HTTP 413 response. """ def __init__(self, *args, **kwargs): if "hint" not in kwargs: kwargs["hint"] = ( "If you receive this error using sync, try again" " with async (the limits are generally more generous there). If you" " receive this message with async, see if the upload can be" " reduced by transmitting only columns absolutely necessary.") Error.__init__(self, *args, **kwargs)
[docs]class Authenticate(Error): """raised to initiate an authentication request. Authenticates are optionally constructed with the realm the user shall authenticate in. If you leave the realm out, the DC-wide default will be used. """ def __init__(self, realm=base.getConfig("web", "realm"), hint=None): self.realm = realm Error.__init__(self, "This is a request to authenticate against %s"%realm, hint=hint)
[docs]class RedirectBase(Error): def __init__(self, dest, hint=None): if isinstance(dest, bytes): dest = dest.decode("utf-8") self.rawDest = dest dest = str(dest) if not dest.startswith("http"): dest = base.getConfig("web", "serverURL")+base.makeSitePath(dest) self.dest = dest Error.__init__(self, "This is supposed to redirect to %s"%dest, hint=hint)
[docs]class WebRedirect(RedirectBase): """raised to redirect a user agent to a different resource (HTTP 301). WebRedirectes are constructed with the destination URL that can be relative (to webRoot) or absolute (starting with http). """
[docs]class SeeOther(RedirectBase): """raised to redirect a user agent to a different resource (HTTP 303). SeeOthers are constructed with the destination URL that can be relative (to webRoot) or absolute (starting with http). They are essentially like WebRedirect, except they put out a 303 instead of a 301. """
[docs]class Found(RedirectBase): """raised to redirect a user agent to a different resource (HTTP 302). Found instances are constructed with the destination URL that can be relative (to webRoot) or absolute (starting with http). They are essentially like WebRedirect, except they put out a 302 instead of a 301. """
[docs]def parseServicePath(serviceParts): """returns a tuple of resourceDescriptor, serviceName. A service id consists of an inputsDir-relative path to a resource descriptor, a slash, and the name of a service within this descriptor. This function returns a tuple of inputsDir-relative path and service name. It raises a gavo.Error if sid has an invalid format. The existence of the resource or the service are not checked. """ return "/".join(serviceParts[:-1]), serviceParts[-1]
[docs]class QueryMeta(dict): """A class keeping information on the query environment. It is constructed with a plain dictionary (there are alternative constructors for t.w requests are below) mapping certain keys (you'll currently have to figure out which from the source) to values, mostly strings, except for the keys listed in listKeys, which should be sequences of strings. If you pass an empty dict, some sane defaults will be used. You can get that "empty" query meta as common.emptyQueryMeta, but make sure you don't mutate it. QueryMetas constructed from request will have the user and password items filled out. If you're using formal, you should set the formal_data item to the dictionary created by formal. This will let people use the parsed parameters in templates. Note: You cannot trust qm["user"] -- it is not validated against any credentials. """ # a set of keys that have sequences as values (needed for construction # from t.w request.strargs) listKeys = set(["_ADDITEM", "_DBOPTIONS_ORDER", "_SET"]) def __init__(self, initArgs=None, defaultLimit=None): if initArgs is None: initArgs = {} self.ctxArgs = utils.CaseSemisensitiveDict(initArgs) if defaultLimit is None: self.defaultLimit = base.getConfig("db", "defaultLimit") else: self.defaultLimit = defaultLimit self["formal_data"] = {} self["user"] = self["password"] = None self["accept"] = {} self["inputTable"] = None self._fillOutput(self.ctxArgs) self._fillDbOptions(self.ctxArgs) self._fillSet(self.ctxArgs)
[docs] @classmethod def fromRequestArgs(cls, inArgs, **kwargs): """constructs a QueryMeta from a gavo.web.common.Request.strargs """ args = {} for key, value in inArgs.items(): # defense against broken legacy code: listify if necessary if not isinstance(value, list): value = [value] if key in cls.listKeys: args[key] = value else: if value: args[key] = value[0] return cls(args, **kwargs)
[docs] @classmethod def fromRequest(cls, request, **kwargs): """constructs a QueryMeta from a gavo.web.common.Request. In addition to getting information from the arguments, this also sets user and password. """ res = cls.fromRequestArgs(request.strargs, **kwargs) res["accept"] = utils.parseAccept(request.getHeader("accept")) res["user"] = request.getUser() or None res["password"] = utils.debytify(request.getPassword(), "utf-8") or None return res
def _fillOutput(self, args): """interprets RESPONSEFORMAT and _FORMAT, as well as _VERB/VERB We will have no format key if neither is given, with RESPONSEFORMAT taking precedence. Services must then inspect the current renderer's defaultOutputFormat attribute. verbosity will default to 20. _VERB gives verlevels directory, VERB is SCS-compliant. This also reads the legacy _TDENC and _VOTABLE_VERSION keys that the ancient HTML format selector still produces. Let's move that to using RESPONSEFORMAT one of these days. """ if "RESPONSEFORMAT" in args: self["format"] = args["RESPONSEFORMAT"] elif "_FORMAT" in args: self["format"] = args["_FORMAT"] try: # prefer fine-grained "verbosity" over _VERB or VERB # Hack: malformed _VERBs result in None verbosity, which is taken to # mean about "use fields of HTML". Absent _VERB or VERB, on the other # hand, means VERB=2, i.e., a sane default if "verbosity" in args: self["verbosity"] = int(args["verbosity"]) elif "_VERB" in args: # internal verb parameter self["verbosity"] = int(args["_VERB"])*10 elif "VERB" in args: # verb parameter for SCS and such self["verbosity"] = int(args["VERB"])*10 else: self["verbosity"] = 20 except ValueError: self["verbosity"] = "HTML" # VERB given, but not an int. self["tdEnc"] = base.getConfig("ivoa", "votDefaultEncoding")=="td" if "_TDENC" in args: try: self["tdEnc"] = base.parseBooleanLiteral(args["_TDENC"]) except ValueError: pass try: self["VOTableVersion"] = tuple(int(v) for v in args["_VOTABLE_VERSION"].split(".")) except: # simple ignore malformed version specs pass self["additionalFields"] = args.get("_ADDITEM", []) def _fillSet(self, args): """interprets the output of a ColumnSet widget. """ self["columnSet"] = None if "_SET" in args: self["columnSet"] = set(args["_SET"]) def _fillDbOptions(self, args): self["dbSortKeys"] = [s.strip() for s in args.get("_DBOPTIONS_ORDER", []) if s.strip()] self["direction"] = {"ASC": "ASC", "DESC": "DESC"}[ args.get("_DBOPTIONS_DIR", "ASC")] try: self["dbLimit"] = int(args["MAXREC"]) except (ValueError, KeyError): self["dbLimit"] = self.defaultLimit try: self["timeout"] = max(float(args["_TIMEOUT"]), 0.001) except (ValueError, KeyError): self["timeout"] = base.getConfig("web", "sqlTimeout")
[docs] def overrideDbOptions(self, sortKeys=None, limit=None, sortFallback=None, direction=None): if sortKeys is not None: self["dbSortKeys"] = sortKeys if not self["dbSortKeys"] and sortFallback is not None: self["dbSortKeys"] = sortFallback.split(",") if limit is not None: self["dbLimit"] = int(limit) if direction is not None: self["direction"] = direction
[docs] def asSQL(self): """returns the dbLimit and dbSortKey values as an SQL fragment. """ frag, pars = [], {} sortKeys = self["dbSortKeys"] dbLimit = self["dbLimit"] # TODO: Sorting needs a thorough redesign, presumably giving column instance # as column keys. These could carry "default up" or "default down" in # properties. Meanwhile, there should be a UI to let users decide on # sort direction. # Meanwhile, let's do an emergency hack. if sortKeys: # Ok, we need to do some emergency securing here. There should be # pre-validation that we're actually seeing a column key, but # just in case let's make sure we're seeing an SQL identifier. # (We can't rely on dbapi's escaping since we're not talking values here) frag.append("ORDER BY %s %s"%(",".join( re.sub('[^A-Za-z0-9"_]+', "", key) for key in sortKeys), self["direction"])) if dbLimit: frag.append("LIMIT %(_matchLimit)s") pars["_matchLimit"] = int(dbLimit)+1 return " ".join(frag), pars
emptyQueryMeta = QueryMeta()
[docs]def getTemplatePath(key): """see loadSystemTemplate. """ userPath = os.path.join(base.getConfig("rootDir"), "web/templates", key) if os.path.exists(userPath): return userPath else: resPath = "resources/templates/"+key if pkg_resources.resource_exists('gavo', resPath): return pkg_resources.resource_filename('gavo', resPath) else: raise base.NotFoundError(key, "template", "system templates")
[docs]def loadSystemTemplate(key): """returns a nevow template for system pages from key. path is interpreted as relative to gavo_root/web/templates (first) and package internal (last). If no template is found, None is returned (this harmonizes with the fallback in CustomTemplateMixin). Note that nevowc.XMLFile at this point ignores the doctype of the template. For non-XHTML doctypes, set a gavo_useDoctype attribute on the renderer. """ try: tp = getTemplatePath(key) if tp is not None: return nevowc.XMLFile(tp) except IOError: pass