Source code for gavo.user.serve

"""
A wrapper script suitable for starting the server.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import grp
import os
import pwd
import resource
import signal
import subprocess
import sys
import time
import urllib.parse
import warnings

from twisted.internet import reactor
from twisted.internet import task
from twisted.internet import threads
from twisted.internet.error import CannotListenError
from twisted.python import log
from twisted.python import logfile
from twisted.web import resource as twresource

from gavo import base
from gavo import rscdesc #noflake: for cache registration
from gavo import registry
from gavo import utils
from gavo.base import config
from gavo.base import cron
from gavo.user import plainui
from gavo.utils import exposedFunction, makeCLIParser, Arg
from gavo.web import root


[docs]def setupServer(rootPage, runIntervalJobs=True): """sets the process up for running a server. This involves managing server resource limits, doing a notification, and, in particular, starting up the scheduler. runIntervalJobs can be set to False to suppress running jobs that are probably maintenance jobs on a running instance and which would be unwelcome on debug instances. """ manageResourceLimits() base.setGlobalMeta("upSince", utils.formatISODT(datetime.datetime.utcnow())) base.ui.notifyWebServerUp() if not runIntervalJobs: # we don't want most periodic stuff to happen when in debug mode, since # it usually will involve fetching or importing things, and it's at # best going to be confusing. However, stuff that is supposed to # run immediately at startup should run (e.g., cleaning up the TAP # table). Define a cron.isCensored function to make that happen -- # note that this is too late to prevent the backup job from # //services -- but that is a no-op unless configured. def isCensored(job): if isinstance(job, cron.TimedJob) or job.interval>=60: return True return False cron.isCensored = isCensored cron.registerScheduleFunction(_Scheduler.scheduleJob)
class _PIDManager(object): """A manager for the PID of the server. There's a single instance of this below. """ def __init__(self): self.path = os.path.join(base.getConfig("stateDir"), "web.pid") def getPID(self): """returns the PID of the currently running server, or None. """ try: with open(self.path) as f: pidString = f.readline() except IOError: # PID file does not exist (or we're beyond repair) return None try: return int(pidString) except ValueError: # junk in PID file -- no sense in keeping it base.ui.notifyWarning("%s contained garbage, attempting to unlink"% self.path) self.clearPID() def setPID(self): """writes the current process' PID to the PID file. Any existing content will be clobbered; thus, you could have races here (and since both daemons would bind to the same socket, only one would survive, possibly the wrong one). Let's just stipulate people won't start two competing daemons. """ try: with open(self.path, "w") as f: f.write(str(os.getpid())) except IOError: # Cannot write PID. This would suggest that much else # is broken as well, so we bail out base.ui.notifyError("Cannot write PID file %s. Assuming all is" " broken, bailing out."%self.path) sys.exit(1) def clearPID(self): """removes the PID file. """ try: os.unlink(self.path) except os.error as ex: if ex.errno==2: # ENOENT, we don't have to do anything pass else: base.ui.notifyError("Cannot remove PID file %s (%s). This" " probably means some other server owns it now."%( self.file, str(ex))) PIDManager = _PIDManager() def _reloadConfig(): """should clear as many caches as we can get hold of. """ base.caches.clearCaches() root.loadUserVanity(root.ArchiveService) config.makeFallbackMeta(reload=True) config.loadConfig() base.ui.notifyInfo("Cleared caches on SIGHUP")
[docs]def manageResourceLimits(): """raises some resource limits to their hard limits. This is, in particular, the number of open FDs, as DaCHS may need a lot of those. """ try: configuredLimit = base.getConfig("web", "serverFDLimit") resource.setrlimit(resource.RLIMIT_NOFILE, (configuredLimit, configuredLimit)) _, hard = resource.getrlimit(resource.RLIMIT_CORE) resource.setrlimit(resource.RLIMIT_CORE, (hard, hard)) # for an actual coredump in production systems (which setuid) # you will usually also need # echo 2 >/proc/sys/fs/suid_dumpable # and this (needs python3-prctl installed): # import prctl; prctl.set_dumpable(1) except (ValueError, resource.error): # it's perfectly legal to call this as an unprivileged user, so # we won't complain here. pass hardLimit = resource.getrlimit(resource.RLIMIT_NOFILE)[-1] resource.setrlimit(resource.RLIMIT_NOFILE, (hardLimit, hardLimit)) base.ui.notifyInfo("Current limits on file descriptors: %s"% repr(resource.getrlimit(resource.RLIMIT_NOFILE)))
def _dropPrivileges(): uid = None user = base.getConfig("web", "user") if user and os.getuid()==0: try: uid = pwd.getpwnam(user)[2] except KeyError: base.ui.notifyError("Cannot change to user %s (not found)\n"%user) sys.exit(1) try: try: os.setgid(grp.getgrnam(base.getConfig("group"))[2]) except Exception as ex: # don't fail because of setgid failure (should I rather?) warnings.warn("Could not sgid to gavo group (%s)."%(str(ex))) os.setuid(uid) except os.error as ex: base.ui.notifyError("Cannot change to user %s (%s)\n"%( user, str(ex))) try: # see develNotes.rstx on the following thing import prctl prctl.set_dumpable(True) except ImportError: # no prctl, no dumps; that ought to be fine pass
[docs]def daemonize(logFile, callable): # We translate TERMs to INTs to ensure finally: code is executed signal.signal(signal.SIGTERM, lambda a,b: os.kill(os.getpid(), signal.SIGINT)) pid = os.fork() if pid == 0: os.setsid() pid = os.fork() if pid==0: os.close(0) os.close(1) os.close(2) os.dup(logFile.fileno()) os.dup(logFile.fileno()) os.dup(logFile.fileno()) callable() else: os._exit(0) else: os._exit(0)
def _configureTwistedLog(theLog=None): """sets up the destination of the twisted web.log. theLog can be a file to write to rather than the default web.log. No rotation will take place on that. Note that dcErrors and dcInfos are set up in user.logui. """ if theLog is None: # create the file with the right permissions before passing it on logFile = getLogFile("web.log") logPath = logFile.name logFile.close() theLog = logfile.LogFile( os.path.basename(logPath), os.path.dirname(logPath)) log.startLogging(theLog, setStdout=False) def rotator(): theLog.shouldRotate() reactor.callLater(86400, rotator) rotator() else: log.startLogging(theLog, setStdout=False)
[docs]def getLogFile(baseName): """returns a log file group-writable by gavo. """ fName = os.path.join(base.getConfig("logDir"), baseName) f = open(fName, "a") try: os.chmod(fName, 0o664) os.chown(fName, -1, grp.getgrnam(base.getConfig("gavoGroup"))[2]) except (KeyError, os.error): # let someone else worry about it pass return f
def _preloadPublishedRDs(): """preloads all RDs with published services in them. This is mainly a good idea in case of buggy code within the RDs which in this way is executed halfway predictably. Note that this function may take a significant amount of time and should not be run within the event loop. """ for rdId in registry.findPublishedRDs(): base.ui.notifyInfo("Preloading %s"%rdId) try: rscdesc.getRD(rdId) except: base.ui.notifyError("Broken RD preloaded: %s."%rdId) base.ui.notifyInfo("Preloading published RDs finished.") def _preloadRDs(): """accesses the RDs mentioned in [web]preloadRDs, and loads all others it finds if [web]preloadAllRDs is True. Errors while loading those are logged but are not fatal to the server. This must be run from the reactor if preloadAllRDs is set, as it uses deferreds. """ for rdId in base.getConfig("web", "preloadRDs"): base.ui.notifyInfo("Preloading %s"%rdId) try: base.caches.getRD(rdId) except: base.ui.notifyError("Broken RD preloaded: %s."%rdId) class _Scheduler(object): """An internal singleton (use as a class) housing a twisted base scheduling function for base.cron. """ lastDelayedCall = None @classmethod def scheduleJob(cls, wakeTime, job): """puts job on the reactor's queue for execution in wakeTime seconds. """ if cls.lastDelayedCall is not None and cls.lastDelayedCall.active(): base.ui.notifyWarning("Cancelling schedule at %s"% cls.lastDelayedCall.getTime()) cls.lastDelayedCall.cancel() cls.lastDelayedCall = reactor.callLater(wakeTime, job) def _loadCertificate(srcFile): """A temporary stand-in for twisted.ssl.PrivateCertificate.load. This is necessary because PrivateCertificate currently ignores intermediate chains in there, and we need these for letsencrypt. This is inspired by code by glyph. """ from twisted.internet.ssl import Certificate, KeyPair, CertificateOptions from OpenSSL.SSL import FILETYPE_PEM # get certificates and keys from source files certs, keys = [], [] with open(srcFile) as f: for line in f: if line.startswith("-----BEGIN"): if 'CERTIFICATE' in line: accum = certs else: accum = keys accum.append([]) accum[-1].append(line) keys = [KeyPair.load("".join(k), FILETYPE_PEM) for k in keys] certs = [Certificate.loadPEM("".join(c)) for c in certs] # now build the options from one key, a cert for it, and any number # of other intermediate stuff if len(keys)!=1: raise base.ReportableError("Only one secret key expected in %s"%srcFile) privateFingerprint = keys[0].keyHash() # find cert for privateKey by fingerprint, stuff everything else # into the chain. chain, serverCert = [], None for cert in certs: if cert.getPublicKey().keyHash()==privateFingerprint: serverCert = cert else: chain.append(cert) if serverCert is None: raise base.ReportableError("No certificate for secret key found in " +srcFile) return CertificateOptions( certificate=serverCert.original, privateKey=keys[0].original, extraCertChain=[c.original for c in chain]) def _perhapsEnableSSL(factory, sslPort=443): """lets the reactor listen to TLS requests, too, if there's a certificate in the right place. Actually, what we expect is a PEM that is a concatenation of a certificate and a private key; that thing needs to be in $GAVO_ROOT/hazmat/bundle.pem If anything goes wrong here, we're just emitting a diagnostic and don't fail. """ certPath = os.path.join( base.getConfig("rootDir"), "hazmat", "bundle.pem") if not os.path.isfile(certPath): return try: reactor.listenSSL( sslPort, factory, _loadCertificate(certPath), interface=base.getConfig("web", "bindAddress")) base.LISTENING_TO_HTTPS = True except Exception as msg: import traceback;traceback.print_exc() log.err("Not turning on TLS because: "+str(msg)) def _startServer(managePID=True): """runs a detached server, dropping privileges and all. """ try: reactor.listenTCP( int(base.getConfig("web", "serverPort")), root.site, interface=base.getConfig("web", "bindAddress")) _perhapsEnableSSL(root.site) except CannotListenError: raise base.ReportableError("Someone already listens on the" " configured port %s."%base.getConfig("web", "serverPort"), hint="This could mean that a DaCHS server is already running." " You would have to manually kill it then since its PID file" " got lost somehow. It's more likely that some" " other server is already taking up this port; you may want to change" " the [web] serverPort setting in that case.") os.chdir(base.getConfig("statedir")) _dropPrivileges() root.site.webLog = _configureTwistedLog() if managePID: PIDManager.setPID() try: setupServer(root) signal.signal(signal.SIGHUP, lambda sig, stack: reactor.callLater(0, _reloadConfig)) reactor.callLater(1, _preloadRDs) if base.getConfig("web", "preloadpublishedrds"): task.deferLater(reactor, 1, threads.deferToThread, _preloadPublishedRDs) reactor.run() finally: if managePID: PIDManager.clearPID()
[docs]@exposedFunction([ Arg("-f", "--foreground", help="Do not detach. This is a bit like" " serve debug, but still changes user, activates logging, and does" " not put DaCHS into debug mode. Use this in systemd units.", action="store_true", dest="foreground"),], help="start the server (and put it in the" " background by default).", ) def start(args): if not getattr(args, "foreground", False): oldPID = PIDManager.getPID() if oldPID is not None: # Server could already be running,.. . if os.path.exists(f"/proc/{oldPID}"): # ...if the PID is active, give up right away sys.exit("It seems there's already a server (pid %s) running." " Try 'dachs serve stop'."%(PIDManager.getPID())) else: warnings.warn( "Unclean server shutdown suspected, trying to clean up...") _stopServer() # this is called here because we might still be root here and # can raise our hard limits. manageResourceLimits() if getattr(args, "foreground", False): _startServer(managePID=False) else: daemonize( getLogFile("server.stderr"), _startServer)
def _waitForServerExit(timeout=5): """waits for server process to terminate. It does so by polling the server pid file. """ for i in range(int(timeout*10)): lastPID = PIDManager.getPID() if lastPID is None: break time.sleep(0.1) else: sys.exit("The server with pid %d refuses to die, probably because\n" "pieces of it hang in the python kernel.\n\n" "Try 'kill -KILL %s' to forcefully terminate it (this will break\n" "connections).\n"%(lastPID, lastPID)) def _stopServer(): pid = PIDManager.getPID() if pid is None: # No server running, nothing to do base.ui.notifyWarning("No running DaCHS server found." " If you are trying to restart a DaCHS server on a systemd machine:" " use `sudo systemctl restart dachs` instead.") return try: os.kill(pid, signal.SIGTERM) except os.error as ex: if ex.errno==3: # no such process PIDManager.clearPID() base.ui.notifyWarning("Removed stale PID file.") return else: raise _waitForServerExit()
[docs]@exposedFunction(help="stop a running server.") def stop(args): _stopServer()
[docs]def waitForUnloadedServer(stopTimeout): """delays while the service [web]serverURL appears to serve clients or stopTimeout seconds are over. This will also return fine if no server is running. If the service keeps having clients after stopTimeout, a ReportableError is raised. """ waitUntil = time.time()+stopTimeout loadURL = base.getConfig("web", "serverURL")+"/clientcount" while time.time()<waitUntil: try: content = utils.urlopenRemote(loadURL).read() if int(content)==0: return except IOError: # let's assume this means "server down" and try to reload it return time.sleep(1) raise base.ReportableError("Timeout while waiting for all clients" " to go away. Please restart manually.")
[docs]@exposedFunction([Arg("--if-unloaded-within", type=int, dest="stopTimeout", default=None, metavar="SECS", help="Wait SECS seconds until the server seems unloaded before" " stopping it; give up if it doesn't seem to get unloaded.")], help="restart the server") def restart(args): if args.stopTimeout: waitForUnloadedServer(args.stopTimeout) _stopServer() start(args)
[docs]@exposedFunction(help="reload server configuration (incomplete)") def reload(args): pid = PIDManager.getPID() if pid is None: raise base.ReportableError("No DaCHS server appears to be running." " Thus, not reloading.") os.kill(pid, signal.SIGHUP)
[docs]class ExitPage(twresource.Resource):
[docs] def render(self, request): request.setHeader("content-type", "text/plain") reactor.stop() return b"exiting."
[docs]@exposedFunction(help="run a server and remain in the foreground, dumping" " all kinds of stuff to the terminal") def debug(args): root.site.webLog = _configureTwistedLog(sys.stderr) base.DEBUG = True root.root.child_exit = ExitPage() reactor.listenTCP( int(base.getConfig("web", "serverPort")), root.site, interface=base.getConfig("web", "bindAddress")) _perhapsEnableSSL(root.site, 40443) # since we don't support non-443 https, really, fudge this to enable # debugging base.getHTTPSBase = lambda:"https://localhost:40443" setupServer(root, runIntervalJobs=False) reactor.run()
[docs]@exposedFunction([ Arg("rdIds", help="one or more rdIds", nargs="+"), ], help="reload RDs listed in the server named by [web]serverURL.") def expireRDs(args): pw = base.getConfig("web", "adminpasswd") if pw=='': raise base.ReportableError("expireRDs needs [web]adminpasswd config item.") for rdId in args.rdIds: try: f = utils.urlopenRemote(base.makeAbsoluteURL( "/__system__/services/cacheclear?" +urllib.parse.urlencode( {"rdId":rdId, "__nevow_form__": "genForm", "submit": "Go"})), creds=("gavoadmin", pw)) ignored = f.read() #noflake: don't care enough to check at this point. except IOError: raise base.ReportableError( f"Failed to reload {rdId} -- have you set the admin password?")
[docs]@exposedFunction([], help="Create or renew a letsencrypt certificate" " for hazmat/server.key using hazmat/account.key (this is for https" " operation); you need to run this as a user that can read and write" " hazmat (typically root).") def updateCertificate(args): from gavo.web import ifpages acmeChallengeDir = ifpages.ACMEChallenge.acmeChallengeDir os.chdir(os.path.join(base.getConfig("rootDir"), "hazmat")) # to make a CSR for the host names we're known as, first write an # openssl cfg... names = [urllib.parse.urlparse(config.get("web", "serverURL")).hostname ]+base.getConfig("web", "alternateHostnames") with open("dachs.conf", "wb") as f: f.write(b"[req]\ndefault_bits=4096\n" b"distinguished_name=req_distinguished_name\n") f.write(b"[req_distinguished_name]\ncommonName=%s\n"% names[0].encode("utf-8")) f.write(b"[SAN]\nsubjectAltName=%s\n"%(b",".join( b'DNS:'+s.encode("utf-8") for s in names))) # ...and then let openssl write the CSR... subprocess.check_call(["openssl", "req", "-new", "-sha256", "-key", "server.key", "-subj", "/", "-reqexts", "SAN", "-config", "dachs.conf", "-out", "dachs.csr"]) # I need to prepare a directory for the challenges # first, and I'm creating a directory in a user-writable directory # as (presumably) root. TODO: think hard if that's as bad an idea # as it sounds. Anyway: we'll have to agree on that directory # between the server and this updater, so it's a bit tricky. try: os.mkdir(acmeChallengeDir) os.chmod(acmeChallengeDir, 0o755) except os.error: pass # let's hope the challenge dir is already in place # The letsencrypt interaction is done by acme-tiny cert = subprocess.check_output(["acme-tiny", "--account-key", "./account.key", "--csr", "dachs.csr", "--acme-dir", acmeChallengeDir]) # finally, glue together the signature with intermediate certificates # and the private key to get the PEM for twisted f = utils.urlopenRemote( # This link needs to be updated now and then. If anyone knows a # stable URI for "*the* itermediate certs for LE": tell me. "https://letsencrypt.org/certs/lets-encrypt-r3.pem") intermediate = f.read() with open("server.key", "rb") as f: secretKey = f.read() with open("bundle.pem", "wb") as f: f.write(cert+b"\n") f.write(intermediate+b"\n") f.write(secretKey+b"\n") subprocess.check_call(["dachs", "serve", "restart", "--if-unloaded-within=300"])
[docs]def main(): plainui.SemiStingyPlainUI(base.ui) base.IS_DACHS_SERVER = True args = makeCLIParser(globals()).parse_args() args.subAction(args)