Source code for gavo.web.grend

"""
Basic Code for Renderers.

Renderers are frontends for services.  They provide the glue to
somehow acquire input (typically, nevow contexts) and then format
the result for the user.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import types
import os
import urllib.request, urllib.parse, urllib.error

from twisted.internet import threads
from twisted.python import log
from twisted.python import urlpath
from twisted.web import template
from twisted.web.template import tags as T

from gavo import base
from gavo import svcs
from gavo import rsc
from gavo import utils
from gavo.formal import nevowc
from gavo.protocols import creds
from gavo.web import common
from gavo.web import htmltable
from gavo.web import weberrors


__docformat__ = "restructuredtext en"


[docs]class RDBlocked(Exception): """is raised when a ResourceDescriptor is blocked due to maintenance and caught by the root resource.. """
########## Useful mixins for Renderers
[docs]class GavoRenderMixin(common.CommonRenderers): """A mixin with renderers useful throughout the data center. Rendering of meta information: * <tag n:render="meta">METAKEY</tag> or * <tag n:render="metahtml">METAKEY</tag> Rendering the sidebar -- <body n:render="withsidebar">. This will only work if the renderer has a service attribute that's enough of a service (i.e., carries meta and knows how to generate URLs). Conditional rendering: * ifmeta * imownmeta * ifdata * ifnodata * ifslot * ifnoslot * ifadmin Obtaining system info * rd <rdId> -- makes the referenced RD the current data (this is not too useful right now, but it lets you check of the existence of RDs already) """ _sidebar = svcs.loadSystemTemplate("sidebar.html").load() _footer = svcs.loadSystemTemplate("footer.html").load() # macro package to use when expanding macros. Just set this # in the constructor as necessary (ServiceBasedRenderer has the # service here) macroPackage = None def _initGavoRender(self): # call this to initialize this mixin. # (kept for backward compatibility; don't use this any more) pass def _doRenderMeta(self, request, tag, raiseOnFail=False, plain=False, carrier=None): if carrier is None: carrier = self.metaCarrier if not hasattr(carrier, "_metaRenderCache"): carrier._metaRenderCache = {} metaKey = "(inaccessible)" try: for child in tag.children: if isinstance(child, str) and child.strip(): metaKey = child.strip() break if (metaKey, plain) in carrier._metaRenderCache: rendered = carrier._metaRenderCache[(metaKey, plain)] else: htmlBuilder = common.HTMLMetaBuilder(self.macroPackage) if plain: rendered = base.getMetaText(carrier, metaKey, raiseOnFail=True, macroPackage=self.macroPackage) else: rendered = carrier.buildRepr(metaKey, htmlBuilder, raiseOnFail=True) carrier._metaRenderCache[(metaKey, plain)] = rendered except base.NoMetaKey: if raiseOnFail: raise return template.Comment("Meta item %s not given."%metaKey) except Exception as ex: msg = "Meta %s bad (%s)"%(metaKey, str(ex)) base.ui.notifyError(msg) return template.Comment(msg) tag.clear() return tag[rendered]
[docs] def data_meta(self, metaKey): """returns the value for the meta key metaName on this service. """ def get(request, data): return self.metaCarrier.getMeta(metaKey) return get
[docs] @template.renderer def meta(self, request, tag): """replaces a meta key with a plain text rendering of the metadata in the service. """ return self._doRenderMeta(request, tag, plain=True)
[docs] @template.renderer def metahtml(self, request, tag): """replaces a meta key with an html rendering of the metadata in the service. """ return self._doRenderMeta(request, tag)
[docs] @template.renderer def datameta(self, request, tag): """replaces the meta key in the contents with the corresponding meta key's HTML rendering. """ return self._doRenderMeta(request, tag, carrier=tag.slotData)
[docs] @template.renderer def ifmeta(self, metaName, propagate=True): """renders its children if there is metadata for metaName. """ if propagate: hasMeta = self.metaCarrier.getMeta(metaName) is not None else: hasMeta = self.metaCarrier.getMeta(metaName, propagate=False) is not None if hasMeta: return lambda request, tag: tag else: return lambda request, tag: ""
[docs] @template.renderer def ifownmeta(self, metaName): """renders its children if there is metadata for metaName in the service itself. """ return self.ifmeta(metaName, propagate=False)
[docs] @template.renderer def ifdata(self, request, tag): if tag.slotData: return tag else: return ""
[docs] @template.renderer def ifnodata(self, request, tag): if not tag.slotData: return tag else: return ""
[docs] @template.renderer def ifslot(self, slotName, invert=False): """renders the children for slotName is present an true in a boolean sense in the current data (which must be a dictionary). """ def render(request, tag): if invert: returnTag = not tag.slotData.get(slotName) else: returnTag = not not tag.slotData.get(slotName) try: if returnTag: return tag else: return "" except KeyError: return "" return render
[docs] @template.renderer def ifnoslot(self, slotName): """renders if slotName is missing or not true in the current data (which must be a dictionary). """ return self.ifslot(slotName, invert=True)
[docs] @template.renderer def ifadmin(self, request, tag): # NOTE: use of this renderer is *not* enough to protect critical operations # since it does not check if the credentials are actually provided. # Use this only hide links that will give 403s (or somesuch) for # non-admins anyway (and the like). if request.getUser()=="gavoadmin": return tag else: return ""
[docs] @template.renderer def explodableMeta(self, request, tag): metaKey = tag.children[0] title = tag.attributes.get("title", metaKey.capitalize()) try: return T.div(class_="explodable")[ T.h4(class_="exploHead")[title], T.div(class_="exploBody")[ self._doRenderMeta(request, tag, raiseOnFail=True)]] except base.MetaError: return ""
[docs] @template.renderer def intro(self, request, tag): """returns something suitable for inclusion above the form. The renderer tries, in sequence, to retrieve a meta called _intro, the description meta, or nothing. """ for key in ["_intro", "description"]: if self.service.getMeta(key, default=None) is not None: introKey = key break else: introKey = None if introKey is None: return tag[""] else: return tag[self.metaCarrier.buildRepr(introKey, common.HTMLMetaBuilder(self.macroPackage), raiseOnFail=False)]
[docs] @template.renderer def authinfo(self, request, tag): svc = getattr(self, "service", None) if svc and request.getUser(): anchorText = "Log out %s"%(utils.debytify(request.getUser())) targetURL = "/login?relog=True" explanation = " (give an empty user name in the dialog popping up)" else: targetURL = urlpath.URLPath.fromString("/login") targetURL.query = "nextURL=%s"%urllib.parse.quote(request.uri) anchorText = "Log in" explanation = "" return tag[T.a(href=str(targetURL))[ anchorText], explanation]
[docs] @template.renderer def prependsite(self, request, tag): """prepends a site id to the body. This is intended for titles and similar; it puts the string in [web]sitename in front of anything that already is in tag. """ tag.children = [base.getConfig("web", "sitename")]+tag.children return tag
[docs] @template.renderer def withsidebar(self, request, tag): oldChildren = tag.children tag.children = [] return tag(class_="container")[ self._sidebar, T.div(id="body")[ oldChildren, self._footer, ], ]
[docs] def data_rd(self, rdId): """returns the RD referenced in the body (or None if the RD is not there) """ def _(request, tag): try: return base.caches.getRD(rdId) except base.NotFoundError: return None return _
[docs]class HTMLResultRenderMixin(object): """is a mixin with render functions for HTML tables and associated metadata within other pages. This is primarily used for the Form renderer; it expects to see the service's result in a result attribute. The thing mixing this in at least has to have self.queryData. """ result = None def _getCurrentData(self, request, tag): """helps resulttable and resultline in figuring out the data to render. See resulttable for the rought rules. """ try: data = tag.slotData except nevowc.NoDataError: data = None else: if not isinstance(data, rsc.BaseTable): data = None if data is None: data = getattr(self, "result", None) if isinstance(data, rsc.BaseTable): return data if hasattr(data, "getPrimaryTable"): return data.getPrimaryTable() return None
[docs] @template.renderer def resulttable(self, request, tag): """HTML-renders a table. If the current data is a table instance, that's what's used. Else, it falls back to the result attribute of the current object. """ data = self._getCurrentData(request, tag) if data is None: # we're probably handling Form errors return "" return htmltable.HTMLTableFragment(data, self.queryMeta)
[docs] @template.renderer def resultline(self, request, tag): """HTML-renders a single table line (the first of the result, actually. """ data = self._getCurrentData(request, tag) if data is None: # we're probably handling Form errors return "" return htmltable.HTMLKeyValueFragment(data, self.queryMeta)
[docs] @template.renderer def parpair(self, request, tag): data = tag.slotData if data is None or data[1] is None or "__" in data[0]: return "" return tag["%s: %s"%data]
[docs] @template.renderer def ifresult(self, request, tag): if self.queryMeta.get("Matched", 1)!=0: return tag else: return ""
[docs] @template.renderer def ifnoresult(self, request, tag): if self.queryMeta.get("Matched", 1)==0: return tag else: return ""
[docs] @template.renderer def iflinkable(self, request, tag): """renders tag if we have a linkable result, nothing otherwise. Linkable means that the result will come out as displayed through a link. Currently, we only see if a file upload was part of the result production -- if there was, it's not linkable. This currently doesn't even look if a file was indeed passed in: Things already are not linkable if the service takes a file upload, whether that's used or not. """ for ik in self.service.getInputKeysFor(self): if ik.type=='file': return "" return tag
[docs] @template.renderer def servicestyle(self, request, tag): """enters custom service styles into tag. They are taken from the service's customCSS property. """ if self.service and self.service.getProperty("customCSS", False): return tag[self.service.getProperty("customCSS")] return ""
[docs] def data_result(self, request, tag): return self.result
[docs] def data_rows(self, request, tag): """returns the rows of the primary result table. """ return self.result.getPrimaryTable().rows
def _makeParPair(self, key, value, fieldDict): title = key if key in fieldDict: title = fieldDict[key].getLabel() if fieldDict[key].type=="file": value = "File upload '%s'"%value[0] else: value = str(value) return title, value __suppressedParNames = set(["submit"])
[docs] def data_queryseq(self, request, tag): if not self.result: return [] if self.service: fieldDict = dict((f.name, f) for f in self.service.getInputKeysFor(self)) else: fieldDict = {} s = [self._makeParPair(k, v, fieldDict) for k, v in self.queryMeta.get("formal_data", {}).items() if v is not None and v!=[] and k not in self.__suppressedParNames and not k.startswith("_")] s.sort() return s
[docs] @template.renderer def flotplot(self, request, tag): """adds an onClick attribute opening a flot plot. This is evaluates the _plotOptions meta. This should be a javascript dictionary literal with certain plot options. More on this in the reference documentation on the _plotOptions meta. """ plotOptions = base.getMetaText(self.service, "_plotOptions") if plotOptions is not None: args = ", %s"%plotOptions else: args = "" return tag(onclick="openFlotPlot($('table.results')%s)"%args)
[docs] @template.renderer def param(self, format): """returns the value of the parameter named content formatted as a python string. Undefined params and NULLs give N/A. Data needs to be something that has a getParam method. """ def renderer(request, tag): parName = tag.children[0].strip() tag.clear() try: val = tag.slotData.getParam(parName) if val is None: return tag["N/A"] return tag[format%val] except base.NotFoundError: return tag["N/A"] return renderer
[docs]class CustomTemplateMixin(object): """a mixin providing for customized templates. This works by making loader a property first checking if the instance has a customTemplate attribute evaluating to true. If it has and it is referring to a string, its content is used as a resdir-relative path to a nevow XML template. If it has and it is not a string, it will be used as a template directly (it's already "loaded"), else defaultLoader attribute of the instance is used. """ customTemplate = None
[docs] def getLoader(self): if not self.customTemplate: return self.defaultLoader elif isinstance(self.customTemplate, str): tplPath = self.rd.getAbsPath(self.customTemplate) if not os.path.exists(tplPath): return self.defaultLoader return nevowc.XMLFile(tplPath) else: return self.customTemplate
loader = property(getLoader)
############# nevow Resource derivatives used here.
[docs]class GavoPage(nevowc.TemplatedPage, GavoRenderMixin): """a base class for all "pages" (i.e. things talking to the web, based on RDs, and possibly using nevow templates) within DaCHS. """
[docs] def handleError(self, failure, request): weberrors.renderDCErrorPage(failure, request)
[docs]class ResourceBasedPage(GavoPage): """A base for renderers based on RDs. It is constructed with the resource descriptor and leaves it in the rd attribute. The preferredMethod attribute is used for generation of registry records and currently should be either GET or POST. urlUse should be one of full, base, post, or dir, in accord with VOResource. Renderers with fixed result types should fill out resultType. The makeAccessURL class method is called by service.getURL; it receives the service's base URL and must return a mogrified string that corresponds to an endpoint this renderer will operate on (this could be used to make a Form renderer into a ParamHTTP interface by attaching ?__nevow_form__=genForm&, and the soap renderer does nontrivial things there). Renderers capable of producing multiple output formats should give a format key (suitable for formats.getKeyFor) in defaultOutputFormat. Within DaCHS, this class is mainly used as a base for ServiceBasedRenderer, since almost always only services talk to the world. However, we try to fudge render and data functions such that the sidebar works. """ preferredMethod = "GET" urlUse = "full" resultType = None defaultOutputFormat = "votable" # parameterStyle is a hint for inputKeys how to transform themselves # "clear" keeps types, "form" gives vizier-like expressions # "vo" gives parameter-like expressions. parameterStyle = "clear" name = None aliases = frozenset() def __init__(self, request, rd): nevowc.TemplatedPage.__init__(self) self.queryMeta = svcs.QueryMeta.fromRequest(request) self.rd = rd self.metaCarrier = rd self.macroPackage = rd if hasattr(self.rd, "currently_blocked"): raise RDBlocked() self._initGavoRender()
[docs] @classmethod def isBrowseable(self, service): """returns True if this renderer applied to service is usable using a plain web browser. """ return False
[docs] @classmethod def isCacheable(self, segments, request): """should return true if the content rendered will only change when the associated RD changes. request is a nevow request object. web.root.ArchiveService already makes sure that you only see GET request without arguments and without a user, so you do not need to check this. """ return False
[docs] @classmethod def makeAccessURL(cls, baseURL): """returns an accessURL for a service with baseURL to this renderer. """ return "%s/%s"%(baseURL, cls.name)
[docs] def data_rdId(self, request, tag): return self.rd.sourceId
[docs] def data_serviceURL(self, type): # for RD's that's simply the rdinfo. return (lambda request, tag: base.makeSitePath("/browse/%s"%self.rd.sourceId))
_IGNORED_KEYS = set(["__nevow_form__", "_charset_", "submit", "nextURL"]) def _formatRequestArgs(args): r"""formats web.Request.strargs for logging. Basically, long objects (ones with len, and len>100) are truncated. >>> _formatRequestArgs({"x": range(2), "y": [u"\u3020"], "submit": ["Ok"]}) "{'x': [0,1,],'y': ['?',],}" >>> _formatRequestArgs({"hokus": ["Pokus"*300]}) "{'hokus': [<data starting with 'PokusPokusPokusPokusPokusPoku>,],}" >>> _formatRequestArgs({"no": []}) '{}' >>> _formatRequestArgs({"plönk": ["göschönkt"]}) "{'plönk': ['g?sch?nkt',],}" """ res = ["{"] for key in sorted(args): valList = args[key] if not valList or key in _IGNORED_KEYS: continue res.append("%s: ["%repr(key)) for value in valList: value = repr(value).encode("ascii", "replace").decode("ascii") try: if len(value)>100: res.append("<data starting with %s>,"%value[:30]) else: res.append(value+",") except TypeError: # no len on value res.append(repr(value)+",") res.append("],") res.append("}") return "".join(res)
[docs]class ServiceBasedPage(ResourceBasedPage): """the base class for renderers turning service-based info into character streams. You will need to provide some way to give nevowc.TemplatedPage templates, either by supplying a loader or (usually preferably) mixing in CustomTemplateMixin -- or just override renderHTTP to make do without templates. You can set an attribute checkedRenderer=False for renderers that are "generic" and do not need to be enumerated in the allowed attribute of the underlying service ("meta renderers"). You can set a class attribute openRenderer=True to make a renderer work even on restricted services (which may make sense for stuff like metadata inspection). This class overrides t.w.template's renderer so renderers defined in the service (e.g., via an RD) are found, too. """ checkedRenderer = True openRenderer = False def __init__(self, request, service): ResourceBasedPage.__init__(self, request, service.rd) if service.hasMeta("superseded"): raise svcs.UnknownURI("This service is superseded by something else.", htmlMessage=T.xml(service.getMeta("superseded").getContent("html"))) self.service = service if not self.openRenderer and service.limitTo: if not creds.hasCredentials(request.getUser(), request.getPassword(), service.limitTo): raise svcs.Authenticate() else: request.setHeader("x-vo-authenticated", request.getUser()) if self.checkedRenderer and self.name not in self.service.allowed: raise svcs.ForbiddenURI( "The renderer %s is not allowed on this service."%self.name, rd=self.service.rd) self.metaCarrier = self.service self.macroPackage = self.service # Set to true when we notice we need to fix the service's output fields self.fieldsChanged = False self._logRequestArgs(request) self._fillServiceDefaults(request.strargs) def _logRequestArgs(self, request): """leaves the actual arguments of a request in the log. """ try: if request.strargs: # even if there are args, don't log them if only boring ones # were given fmtArgs = _formatRequestArgs(request.strargs) if fmtArgs!='{}': log.msg("# Processing starts: %s %s"%( request.path.decode("ascii", "ignore"), fmtArgs)) except: # don't fail because of logging problems base.ui.notifyError("Formatting of request strargs failed.") def _fillServiceDefaults(self, args): """a hook to enter default parameters based on the service. """ if self.service.core.hasProperty("defaultSortKey"): if "_DBOPTIONS_ORDER" not in args: args["_DBOPTIONS_ORDER"] = self.service.core.getProperty( "defaultSortKey").split(",")
[docs] def runSync(self, rawData): """calls the actual service. This will run in the current thread; you will usually want to use runAsync from the main nevow event loop unless you know the service is quick or actually works asynchronously. """ return self.service.run(self, rawData, self.queryMeta)
[docs] def runAsync(self, rawData): """takes raw data and returns a deferred firing the service result. This will always return a deferred. """ return threads.deferToThread(self.runSync, rawData)
[docs] def runAsyncWithFormalData(self, rawData, request): """runs the service, taking arguments from material preparsed by nevow formal. This is the entry point for the form renderer and its friends. Like runAsync, it always returns a deferred. """ self.queryMeta["formal_data"] = rawData # contextGrammar wants a dict of lists, whereas formal has direct # values; accommodate to contextGrammar if (self.service.core.outputTable.columns and not self.service.getCurOutputFields(self.queryMeta)): raise base.ValidationError("These output settings yield no" " output fields", "_OUTPUT") data = {} for k, v in rawData.items(): if v is None or isinstance(v, list): data[k] = v else: data[k] = [v] return self.runAsync(svcs.PreparsedInput(data))
[docs] def data_serviceURL(self, renderer): """returns a relative URL for this service using the renderer. This is usually used like this: <a><n:attr name="href" n:data="serviceURL info" n:render="data">x</a> """ def get(request, data): return self.service.getURL(renderer, absolute="False") return get
[docs] def lookupRenderMethod(self, name): """overrides parent lookupRenderMethod to also include custom render functions from the service definition in the RD. """ if name in self.service.nevowRenderers: return types.MethodType( self.service.nevowRenderers[name], self) return nevowc.TemplatedPage.lookupRenderMethod(self, name)
[docs] def lookupDataMethod(self, name): """overrides parent lookupDataMethod to also include data methods coming from the service definition in the RD. """ if name in self.service.nevowDataFunctions: return types.MethodType( self.service.nevowDataFunctions[name], self) return nevowc.TemplatedPage.lookupDataMethod(self, name)
[docs] def getChild(self, name, request): # By default, ServiceBasedPages have no directory-like resources. # So, if some overzealous entity added a slash, just redirect. # Do not upcall to this if you override getChild. if name==b"": raise svcs.WebRedirect(str(request.URLPath().parent())) raise svcs.UnknownURI("%s has no child resources"%repr(self.name))
if __name__=="__main__": # pragma: no cover import doctest, grend doctest.testmod(grend)