Source code for gavo.base.osinter

"""
Basic OS interface/utility functions that depend on our configuration.

(everything that doesn't need getConfig is somewhere in gavo.utils)
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import functools
import grp
import inspect
import re
import os
import subprocess
import time
import urllib.parse
from email import charset
from email import utils as emailutils
from email.header import Header
from email.parser import Parser
from email.mime.nonmultipart import MIMENonMultipart

import pkg_resources

from twisted import copyright as twcopyright

from gavo.base import config
from gavo import utils


[docs]def getGroupId(): gavoGroup = config.get("group") try: return grp.getgrnam(gavoGroup)[2] except KeyError as ex: raise utils.ReportableError("Group %s does not exist"%str(ex), hint="You should have created this (unix) group when you" " created the server user (usually, 'gavo'). Just do it" " now and re-run this program.")
[docs]def makeSharedDir(path, writable=True): """creates a directory with group ownership [general]group. There's much that can to wrong; we try to raise useful error messages. """ if not os.path.isdir(path): try: os.makedirs(path) except os.error as err: raise utils.ReportableError( "Could not create directory %s"%path, hint="The operating system reported: %s"%err) except Exception as msg: raise utils.ReportableError( "Could not create directory %s (%s)"%(path, msg)) gavoGroup = getGroupId() stats = os.stat(path) if stats.st_mode&0o060!=0o60 or stats.st_gid!=gavoGroup: try: os.chown(path, -1, gavoGroup) if writable: os.chmod(path, stats.st_mode | 0o060) except Exception as msg: raise utils.ReportableError( "Cannot set %s to group ownership %s, group writable"%( path, gavoGroup), hint="Certain directories must be writable by multiple user ids." " They must therefore belong to the group %s and be group" " writeable. The attempt to make sure that's so just failed" " with the error message %s." " Either grant the directory in question to yourself, or" " fix permissions manually. If you own the directory and" " sill see permission errors, try 'newgrp %s'"%( config.get("group"), msg, config.get("group")))
[docs]@functools.lru_cache(1) def getHTTPBase(): """returns the server's base URL for the http protocol. This is just serverURL from the configuration, unless serverURL is https; in that case, we replace https with http. serverPort is ignored here under the assumption that there's a reverse proxy. If that bites you, we could introduce an alternativeServerURL config item. """ serverURL = config.get("web", "serverurl") if serverURL.startswith("https:"): return "http:"+serverURL[6:] else: return serverURL
[docs]@functools.lru_cache(1) def getHTTPSBase(): """return the server's base URL for the https protocol. If serverURL already is https, that's what's returned. If not, the URL is parsed, any port specification is removed (i.e., we only support https on port 443), the protocol is changed to https, and the result is returned. """ serverURL = config.get("web", "serverurl") if serverURL.startswith("https:"): return serverURL else: parts = urllib.parse.urlparse(serverURL) return urllib.parse.urlunparse(("https", parts.hostname, parts.path, parts.params, parts.query, parts.fragment))
[docs]def switchProtocol(url): """tries to make an https URL from an http one and vice versa. This function will raise a ValueError if url doesn't start with either HTTPBase or HTTPSBase. Otherwise, it will replace one by the other. """ httpBase = getHTTPBase() httpsBase = getHTTPSBase() if url.startswith(httpBase): return httpsBase+url[len(httpBase):] elif url.startswith(httpsBase): return httpBase+url[len(httpsBase):] else: raise ValueError("Cannot switch protocol on a URL not configured" " in [web]serverURL")
[docs]def getCurrentServerURL(): """returns the server URL pertinent for the current request. This looks upstack for a renderer object having an isSecure attribute. If it finds one, it will assume it's a twisted request, call isSecure and return HTTPBase() or HTTPSBase() as appropriate. If not, it will return [web]serverurl """ if config.get("web", "adaptProtocol"): frame = inspect.currentframe().f_back.f_back while frame: if "request" in frame.f_locals: if hasattr(frame.f_locals["request"], "isSecure"): if frame.f_locals["request"].isSecure(): return getHTTPSBase() else: return getHTTPBase() break frame = frame.f_back return config.get("web", "serverurl")
[docs]@utils.document def makeSitePath(path): """returns a rooted local part for a server-internal URL. uri itself needs to be server-absolute; a leading slash is recommended for clarity but not mandatory. """ return str(config.get("web", "nevowRoot")+path.lstrip("/"))
[docs]@utils.document def makeAbsoluteURL(path, canonical=False): """returns a fully qualified URL for a rooted local part. This will reflect the http/https access mode unless you pass canonical=True, in which case [web]serverURL will be used unconditionally. """ path = utils.debytify(path) if canonical: serverURL = config.get("web", "serverurl") else: serverURL = getCurrentServerURL() return str(serverURL+makeSitePath(path))
[docs]def getBinaryName(baseName): """returns the name of a binary it thinks is appropriate for the platform. To do this, it asks config for the platform name, sees if there's a binary <bin>-<platname> if platform is nonempty. If it exists, it returns that name, in all other cases, it returns baseName unchanged. """ platform = config.get("platform") if platform: platName = baseName+"-"+platform if os.path.exists(platName): return platName return baseName
[docs]def getPathForDistFile(name): """returns a path for a "dist resource", i.e., a file distributed with DaCHS. name is the file relative to resources. This is essentially pkg_resources.resource_filename with a dash of built-in configuration. """ return pkg_resources.resource_filename('gavo', "resources/"+name)
[docs]def openDistFile(name, mode="r", encoding=None): """returns an open file for a "dist resource", i.e., a file distributed with DaCHS. see getPathForDistFile """ return open(getPathForDistFile(name), mode, encoding=encoding)
[docs]@functools.lru_cache(1) def getVersion(): """returns (as a string) the DaCHS version running. The information is obtained from setuptools. """ return pkg_resources.require("gavodachs")[0].version
# Our software id as per https://ivoa.net/documents/Notes/softid/ SERVER_SOFTWARE = "DaCHS/%s twistedWeb/%s"%( getVersion(), twcopyright.version)
[docs]def formatMail(mailText): """returns a mail with headers and content properly formatted as a bytestring and MIME. """ rawHeaders, rawBody = mailText.split("\n\n", 1) cs = charset.Charset("utf-8") cs.body_encoding = charset.QP cs.header_encoding = charset.QP # they've botched MIMEText so bad it can't really generate # quoted-printable UTF-8 any more. So, let's forget MIMEText: msg = MIMENonMultipart("text", "plain", charset="utf-8") msg.set_payload(rawBody, charset=cs) for key, value in list(Parser().parsestr(rawHeaders).items()): if key.lower()=="date": continue if re.match("[ -~]*$", value): # it's plain ASCII, don't needlessly uglify output msg[key] = value else: msg[key] = Header(value, cs) if "From" not in msg: msg["From"] = '"DaCHS server %s" <%s>'%( config.get("web", "siteName").replace('"', "'"), config.get("maintainerAddress")) msg["Date"] = emailutils.formatdate(time.time(), localtime=False, usegmt=True) msg["X-Mailer"] = "DaCHS VO Server" return msg.as_string()
# only send each sort of mail once per hour MAIL_LIMITER = utils.RateLimiter(3600)
[docs]def sendMail(mailText, rateLimitKey=None): """sends mailText (which has to have all the headers) via sendmail. (which is configured in [general]sendmail). This will return True when sendmail has accepted the mail, False otherwise. """ if rateLimitKey: if MAIL_LIMITER.inDeadtime(rateLimitKey): return if not config.get("sendmail"): utils.setUIEvent("Warning", "Wanted to send maintainer mail but" " could not since [general]sendmail is not configured.") mailText = formatMail(mailText) pipe = subprocess.Popen(config.get("sendmail"), shell=True, stdin=subprocess.PIPE) pipe.stdin.write(mailText.encode("ascii", "ignore")) pipe.stdin.close() if pipe.wait(): utils.sendUIEvent("Error", "Wanted to send mail starting with" " '%s', but sendmail returned an error message" " (check the [general]sendmail setting)."% utils.makeEllipsis(mailText, 300)) return False return True
[docs]def tryRemoteReload(rdId): """tries to reload the rdId on a running service This only works if there's [web]adminpasswd and[web]serverURL set, and both match what the actual server uses. """ pw = config.get("web", "adminpasswd") # don't bother if admin passwd has not been set or when running unit tests. if pw=="" or pw=="this_is_the_unittest_suite": return try: f = utils.urlopenRemote(makeAbsoluteURL("/seffe/%s"%rdId), data={"__nevow_form__": "adminOps", "submit": "Reload RD"}, creds=("gavoadmin", pw)) f.read() except IOError: # this is probably a refused connection; if the server doesn't run # don't bother to report that. pass except Exception as ex: utils.sendUIEvent("Info", "Could not reload %s RD (%s). This means" " that the server may still use stale metadata. You may want" " to reload %s manually (or restart the server)."%(rdId, ex, rdId))