Package gavo :: Package user :: Module serve
[frames] | no frames]

Source Code for Module gavo.user.serve

  1  """ 
  2  A wrapper script suitable for starting the server. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import datetime 
 12  import grp 
 13  import os 
 14  import pwd 
 15  import resource 
 16  import signal 
 17  import subprocess 
 18  import sys 
 19  import time 
 20  import urllib 
 21  import warnings 
 22  from urlparse import urlparse 
 23   
 24  from nevow import inevow 
 25  from nevow import rend 
 26  from twisted.internet import reactor 
 27  from twisted.internet import task 
 28  from twisted.internet import threads 
 29  from twisted.internet.error import CannotListenError 
 30  from twisted.python import log 
 31  from twisted.python import logfile 
 32   
 33  from gavo import base 
 34  from gavo import rscdesc #noflake: for cache registration 
 35  from gavo import registry 
 36  from gavo import utils 
 37  from gavo.base import config 
 38  from gavo.base import cron 
 39  from gavo.user import plainui 
 40  from gavo.utils import exposedFunction, makeCLIParser, Arg 
 41  from gavo.web import root 
42 43 44 -def setupServer(rootPage):
45 manageResourceLimits() 46 config.setMeta("upSince", utils.formatISODT(datetime.datetime.utcnow())) 47 base.ui.notifyWebServerUp() 48 if base.DEBUG: 49 # we don't want periodic stuff to happen when in debug mode, since 50 # it usually will involve fetching or importing things, and it's at 51 # best going to be confusing. However, stuff that is supposed to 52 # run immediately at startup should run (e.g., cleaning up the TAP 53 # table). We install a special scheduler only running such jobs 54 # ("every" jobs with a negative repeat). 55 cron.registerScheduleFunction(_DebugScheduler.scheduleJob) 56 else: 57 cron.registerScheduleFunction(_Scheduler.scheduleJob)
58
59 60 -class _PIDManager(object):
61 """A manager for the PID of the server. 62 63 There's a single instance of this below. 64 """
65 - def __init__(self):
66 self.path = os.path.join(base.getConfig("stateDir"), "web.pid")
67
68 - def getPID(self):
69 """returns the PID of the currently running server, or None. 70 """ 71 try: 72 with open(self.path) as f: 73 pidString = f.readline() 74 except IOError: # PID file does not exist (or we're beyond repair) 75 return None 76 try: 77 return int(pidString) 78 except ValueError: # junk in PID file -- no sense in keeping it 79 base.ui.notifyWarning("%s contained garbage, attempting to unlink"% 80 self.path) 81 self.clearPID()
82
83 - def setPID(self):
84 """writes the current process' PID to the PID file. 85 86 Any existing content will be clobbered; thus, you could have 87 races here (and since both daemons would bind to the same socket, 88 only one would survive, possibly the wrong one). Let's just stipulate 89 people won't start two competing daemons. 90 """ 91 try: 92 with open(self.path, "w") as f: 93 f.write(str(os.getpid())) 94 except IOError: # Cannot write PID. This would suggest that much else 95 # is broken as well, so we bail out 96 base.ui.notifyError("Cannot write PID file %s. Assuming all is" 97 " broken, bailing out."%self.path) 98 sys.exit(1)
99
100 - def clearPID(self):
101 """removes the PID file. 102 """ 103 try: 104 os.unlink(self.path) 105 except os.error as ex: 106 if ex.errno==2: # ENOENT, we don't have to do anything 107 pass 108 else: 109 base.ui.notifyError("Cannot remove PID file %s (%s). This" 110 " probably means some other server owns it now."%( 111 self.file, str(ex)))
112 113 114 PIDManager = _PIDManager()
115 116 117 -def _reloadConfig():
118 """should clear as many caches as we can get hold of. 119 """ 120 base.caches.clearCaches() 121 122 root.loadUserVanity(root.ArchiveService) 123 config.makeFallbackMeta(reload=True) 124 config.loadConfig() 125 126 base.ui.notifyInfo("Cleared caches on SIGHUP")
127
128 129 -def manageResourceLimits():
130 """raises some resource limits to their hard limits. 131 132 This is, in particular, the number of open FDs, as DaCHS may need a 133 lot of those. 134 """ 135 try: 136 configuredLimit = base.getConfig("web", "serverFDLimit") 137 resource.setrlimit(resource.RLIMIT_NOFILE, 138 (configuredLimit, configuredLimit)) 139 except (ValueError, resource.error): 140 # it's perfectly legal to call this as an unprivileged user, so 141 # we won't complain here. 142 pass 143 144 hardLimit = resource.getrlimit(resource.RLIMIT_NOFILE)[-1] 145 resource.setrlimit(resource.RLIMIT_NOFILE, (hardLimit, hardLimit)) 146 base.ui.notifyInfo("Current limits on current file descriptors: %s"% 147 repr(resource.getrlimit(resource.RLIMIT_NOFILE)))
148
149 150 -def _dropPrivileges():
151 uid = None 152 user = base.getConfig("web", "user") 153 if user and os.getuid()==0: 154 try: 155 uid = pwd.getpwnam(user)[2] 156 except KeyError: 157 base.ui.notifyError("Cannot change to user %s (not found)\n"%user) 158 sys.exit(1) 159 try: 160 try: 161 os.setgid(grp.getgrnam(base.getConfig("group"))[2]) 162 except Exception as ex: 163 # don't fail because of setgid failure (should I rather?) 164 warnings.warn("Could not sgid to gavo group (%s)."%(str(ex))) 165 os.setuid(uid) 166 except os.error as ex: 167 base.ui.notifyError("Cannot change to user %s (%s)\n"%( 168 user, str(ex)))
169
170 171 -def daemonize(logFile, callable):
172 # We translate TERMs to INTs to ensure finally: code is executed 173 signal.signal(signal.SIGTERM, 174 lambda a,b: os.kill(os.getpid(), signal.SIGINT)) 175 pid = os.fork() 176 if pid == 0: 177 os.setsid() 178 pid = os.fork() 179 if pid==0: 180 os.close(0) 181 os.close(1) 182 os.close(2) 183 os.dup(logFile.fileno()) 184 os.dup(logFile.fileno()) 185 os.dup(logFile.fileno()) 186 callable() 187 else: 188 os._exit(0) 189 else: 190 os._exit(0)
191
192 193 -def _configureTwistedLog():
194 theLog = logfile.LogFile("web.log", base.getConfig("logDir")) 195 log.startLogging(theLog, setStdout=False) 196 def rotator(): 197 theLog.shouldRotate() 198 reactor.callLater(86400, rotator)
199 rotator() 200
201 202 -def getLogFile(baseName):
203 """returns a log file group-writable by gavo. 204 """ 205 fName = os.path.join(base.getConfig("logDir"), baseName) 206 f = open(fName, "a") 207 try: 208 os.chmod(fName, 0664) 209 os.chown(fName, -1, grp.getgrnam(base.getConfig("gavoGroup"))[2]) 210 except (KeyError, os.error): # let someone else worry about it 211 pass 212 return f
213
214 215 -def _preloadPublishedRDs():
216 """preloads all RDs with published services in them. 217 218 This is mainly a good idea in case of buggy code within the RDs which 219 in this way is executed halfway predictably. 220 221 Note that this function may take a significant amount of time and 222 should not be run within the event loop. 223 """ 224 for rdId in registry.findPublishedRDs(): 225 base.ui.notifyInfo("Preloading %s"%rdId) 226 try: 227 rscdesc.getRD(rdId) 228 except: 229 base.ui.notifyError("Broken RD preloaded: %s."%rdId) 230 base.ui.notifyInfo("Preloading published RDs finished.")
231
232 233 -def _preloadRDs():
234 """accesses the RDs mentioned in [web]preloadRDs, and loads all 235 others it finds if [web]preloadAllRDs is True. 236 237 Errors while loading those are logged but are not fatal to the server. 238 239 This must be run from the reactor if preloadAllRDs is set, as it 240 uses deferreds. 241 """ 242 for rdId in base.getConfig("web", "preloadRDs"): 243 base.ui.notifyInfo("Preloading %s"%rdId) 244 try: 245 base.caches.getRD(rdId) 246 except: 247 base.ui.notifyError("Broken RD preloaded: %s."%rdId)
248
249 250 -class _Scheduler(object):
251 """An internal singleton (use as a class) housing a twisted base 252 scheduling function for base.cron. 253 """ 254 lastDelayedCall = None 255 256 @classmethod
257 - def scheduleJob(cls, wakeTime, job):
258 """puts job on the reactor's queue for execution in wakeTime seconds. 259 """ 260 if cls.lastDelayedCall is not None and cls.lastDelayedCall.active(): 261 base.ui.notifyWarning("Cancelling schedule at %s"%cls.lastDelayedCall.getTime()) 262 cls.lastDelayedCall.cancel() 263 264 cls.lastDelayedCall = reactor.callLater(wakeTime, job)
265
266 267 -class _DebugScheduler(object):
268 """An internal singleton for running "immediate" jobs in debug mode. 269 """ 270 @classmethod
271 - def scheduleJob(cls, wakeTime, job):
272 """executes the job if wakeTime is, essentially, now, and 273 forgets it otherwise. 274 """ 275 if wakeTime<30: 276 # take job out of the execution context, as this may be during 277 # an import, and we don't want to create threads during imports. 278 reactor.callLater(0.5, job)
279
280 281 -def _loadCertificate(srcFile):
282 """A temporary stand-in for twisted.ssl.PrivateCertificate.load. 283 284 This is necessary because PrivateCertificate currently ignores intermediate 285 chains in there, and we need these for letsencrypt. 286 287 This is inspired by code by glyph. 288 """ 289 from twisted.internet.ssl import Certificate, KeyPair, CertificateOptions 290 from OpenSSL.SSL import FILETYPE_PEM 291 292 # get certificates and keys from source files 293 certs, keys = [], [] 294 with open(srcFile) as f: 295 for line in f: 296 if line.startswith("-----BEGIN"): 297 if 'CERTIFICATE' in line: 298 accum = certs 299 else: 300 accum = keys 301 accum.append([]) 302 accum[-1].append(line) 303 304 keys = [KeyPair.load("".join(k), FILETYPE_PEM) for k in keys] 305 certs = [Certificate.loadPEM("".join(c)) for c in certs] 306 307 # now build the options from one key, a cert for it, and any number 308 # of other intermediate stuff 309 if len(keys)!=1: 310 raise base.ReportableError("Only one secret key expected in %s"%srcFile) 311 privateFingerprint = keys[0].keyHash() 312 313 # find cert for privateKey by fingerprint, stuff everything else 314 # into the chain. 315 chain, serverCert = [], None 316 for cert in certs: 317 if cert.getPublicKey().keyHash()==privateFingerprint: 318 serverCert = cert 319 else: 320 chain.append(cert) 321 322 if serverCert is None: 323 raise base.ReportableError("No certificate for secret key found in " 324 +srcFile) 325 return CertificateOptions( 326 certificate=serverCert.original, 327 privateKey=keys[0].original, 328 extraCertChain=[c.original for c in chain])
329
330 331 332 -def _perhapsEnableSSL(factory, sslPort=443):
333 """lets the reactor listen to TLS requests, too, if there's a certificate 334 in the right place. 335 336 Actually, what we expect is a PEM that is a concatenation of a certificate 337 and a private key; that thing needs to be in $GAVO_ROOT/hazmat/bundle.pem 338 339 If anything goes wrong here, we're just emitting a diagnostic and don't 340 fail. 341 """ 342 certPath = os.path.join( 343 base.getConfig("rootDir"), "hazmat", "bundle.pem") 344 if not os.path.isfile(certPath): 345 return 346 347 try: 348 reactor.listenSSL( 349 sslPort, 350 factory, 351 _loadCertificate(certPath), 352 interface=base.getConfig("web", "bindAddress")) 353 except Exception as msg: 354 import traceback;traceback.print_exc() 355 log.err("Not turning on TLS because: "+str(msg))
356
357 358 -def _startServer():
359 """runs a detached server, dropping privileges and all. 360 """ 361 try: 362 reactor.listenTCP( 363 int(base.getConfig("web", "serverPort")), 364 root.site, 365 interface=base.getConfig("web", "bindAddress")) 366 _perhapsEnableSSL(root.site) 367 except CannotListenError: 368 raise base.ReportableError("Someone already listens on the" 369 " configured port %s."%base.getConfig("web", "serverPort"), 370 hint="This could mean that a DaCHS server is already running." 371 " You would have to manually kill it then since its PID file" 372 " got lost somehow. It's more likely that some" 373 " other server is already taking up this port; you may want to change" 374 " the [web] serverPort setting in that case.") 375 _dropPrivileges() 376 root.site.webLog = _configureTwistedLog() 377 378 PIDManager.setPID() 379 try: 380 setupServer(root) 381 signal.signal(signal.SIGHUP, lambda sig, stack: 382 reactor.callLater(0, _reloadConfig)) 383 reactor.callLater(1, _preloadRDs) 384 task.deferLater(reactor, 385 1, threads.deferToThread, _preloadPublishedRDs) 386 reactor.run() 387 finally: 388 PIDManager.clearPID()
389
390 391 @exposedFunction(help="start the server and put it in the background.") 392 -def start(args):
393 oldPID = PIDManager.getPID() 394 if oldPID is not None: # Server could already be running,.. . 395 if os.path.exists("/proc/%s"%oldPID): 396 # ...if the PID is active, give up right away 397 sys.exit("It seems there's already a server (pid %s) running." 398 " Try 'gavo serve stop'."%(PIDManager.getPID())) 399 else: 400 warnings.warn("Unclean server shutdown suspected, trying to clean up...") 401 _stopServer() 402 403 # this is called here because we might still be root here and 404 # can raise our hard limits. 405 manageResourceLimits() 406 daemonize( 407 getLogFile("server.stderr"), 408 _startServer)
409
410 411 -def _waitForServerExit(timeout=5):
412 """waits for server process to terminate. 413 414 It does so by polling the server pid file. 415 """ 416 for i in range(int(timeout*10)): 417 lastPID = PIDManager.getPID() 418 if lastPID is None: 419 break 420 time.sleep(0.1) 421 else: 422 sys.exit("The server with pid %d refuses to die, probably because\n" 423 "pieces of it hang in the python kernel.\n\n" 424 "Try 'kill -KILL %s' to forcefully terminate it (this will break\n" 425 "connections).\n"%(lastPID, lastPID))
426
427 428 -def _stopServer():
429 pid = PIDManager.getPID() 430 if pid is None: # No server running, nothing to do 431 base.ui.notifyWarning("No running DaCHS server found.") 432 return 433 434 try: 435 os.kill(pid, signal.SIGTERM) 436 except os.error as ex: 437 if ex.errno==3: # no such process 438 PIDManager.clearPID() 439 base.ui.notifyWarning("Removed stale PID file.") 440 return 441 else: 442 raise 443 _waitForServerExit()
444
445 446 @exposedFunction(help="stop a running server.") 447 -def stop(args):
448 _stopServer()
449
450 451 -def waitForUnloadedServer(stopTimeout):
452 """delays while the service [web]serverURL appears to serve clients 453 or stopTimeout seconds are over. 454 455 This will also return fine if no server is running. If the service 456 keeps having clients after stopTimeout, a ReportableError is raised. 457 """ 458 waitUntil = time.time()+stopTimeout 459 loadURL = base.getConfig("web", "serverURL")+"/clientcount" 460 while time.time()<waitUntil: 461 try: 462 content = utils.urlopenRemote(loadURL).read() 463 if int(content)==0: 464 return 465 except IOError: 466 # let's assume this means "server down" and try to reload it 467 return 468 time.sleep(1) 469 470 raise base.ReportableError("Timeout while waiting for all clients" 471 " to go away. Please restart manually.")
472 473 474 @exposedFunction([Arg("--if-unloaded-within", type=int, 475 dest="stopTimeout", default=None, metavar="SECS", 476 help="Wait SECS seconds until the server seems unloaded before" 477 " stopping it; give up if it doesn't seem to get unloaded.")], 478 help="restart the server")
479 -def restart(args):
480 if args.stopTimeout: 481 waitForUnloadedServer(args.stopTimeout) 482 _stopServer() 483 start(args)
484
485 486 @exposedFunction(help="reload server configuration (incomplete)") 487 -def reload(args):
488 pid = PIDManager.getPID() 489 if pid is None: 490 raise base.ReportableError("No DaCHS server appears to be running." 491 " Thus, not reloading.") 492 os.kill(pid, signal.SIGHUP)
493
494 495 -class ExitPage(rend.Page):
496 - def renderHTTP(self, ctx):
497 req = inevow.IRequest(ctx) 498 req.setHeader("content-type", "text/plain") 499 reactor.stop() 500 return "exiting."
501 502 503 @exposedFunction(help="run a server and remain in the foreground, dumping" 504 " all kinds of stuff to the terminal")
505 -def debug(args):
506 log.startLogging(sys.stderr) 507 base.DEBUG = True 508 root.root.child_exit = ExitPage() 509 reactor.listenTCP( 510 int(base.getConfig("web", "serverPort")), 511 root.site, 512 interface=base.getConfig("web", "bindAddress")) 513 _perhapsEnableSSL(root.site, 40443) 514 # since we don't support non-443 https, really, fudge this to enable 515 # debugging 516 base.getHTTPSBase._cache[()] = "https://localhost:40443" 517 setupServer(root) 518 reactor.run()
519 520 521 @exposedFunction([ 522 Arg("rdIds", help="one or more rdIds", nargs="+"), 523 ], help="reload RDs listed in the server named by [web]serverURL.")
524 -def expireRDs(args):
525 pw = base.getConfig("web", "adminpasswd") 526 if pw=='': 527 raise base.ReportableError("expireRDs needs [web]adminpasswd config item.") 528 529 for rdId in args.rdIds: 530 if rdId.startswith("//"): 531 rdId = "__system__"+rdId[1:] 532 533 try: 534 f = utils.urlopenRemote(base.makeAbsoluteURL( 535 "/seffe/"+urllib.quote(rdId)), 536 urllib.urlencode({"__nevow_form__": "adminOps", "submit": "Reload RD"}), 537 creds=("gavoadmin", pw)) 538 ignored = f.read() #noflake: don't care enough to check at this point. 539 except IOError as msg: 540 raise base.ReportableError("Failed to reload %s: %s"%(rdId, msg))
541 542 543 @exposedFunction([], help="Create or renew a letsencrypt certificate" 544 " for hazmat/server.key using hazmat/account.key (this is for https" 545 " operation); you need to run this as a user that can read and write" 546 " hazmat (typically root).")
547 -def updateCertificate(args):
548 from gavo.web import ifpages 549 acmeChallengeDir = ifpages.WellKnown.acmeChallengeDir 550 551 os.chdir(os.path.join(base.getConfig("rootDir"), "hazmat")) 552 553 # to make a CSR for the host names we're known as, first write an 554 # openssl cfg... 555 names = [urlparse(config.get("web", "serverURL")).hostname 556 ]+base.getConfig("web", "alternateHostnames") 557 with open("dachs.conf", "w") as f: 558 f.write("[req]\ndefault_bits=4096\n" 559 "distinguished_name=req_distinguished_name\n") 560 f.write("[req_distinguished_name]\ncommonName=%s\n"% 561 names[0]) 562 f.write("[SAN]\nsubjectAltName=%s\n"%(",".join( 563 'DNS:'+s for s in names))) 564 565 # ...and then let openssl write the CSR... 566 subprocess.check_call(["openssl", "req", "-new", "-sha256", 567 "-key", "server.key", "-subj", "/", "-reqexts", "SAN", 568 "-config", "dachs.conf", "-out", "dachs.csr"]) 569 570 # I need to prepare a directory for the challenges 571 # first, and I'm creating a directory in a user-writable directory 572 # as (presumably) root. TODO: think hard if that's as bad an idea 573 # as it sounds. Anyway: we'll have to agree on that directory 574 # between the server and this updater, so it's a bit tricky. 575 try: 576 os.mkdir(acmeChallengeDir) 577 os.chmod(acmeChallengeDir, 0o755) 578 except os.error: 579 pass # let's hope the challenge dir is already in place 580 581 # The letsencrypt interaction is done by acme-tiny 582 cert = subprocess.check_output(["acme-tiny", 583 "--account-key", "./account.key", "--csr", "dachs.csr", 584 "--acme-dir", acmeChallengeDir]) 585 586 # finally, glue together the signature with intermediary certificates 587 # and the private key to get the PEM for twisted 588 f = utils.urlopenRemote( 589 "https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem") 590 intermediate = f.read() 591 592 with open("server.key", "rb") as f: 593 secretKey = f.read() 594 595 with open("bundle.pem", "wb") as f: 596 f.write(cert+"\n") 597 f.write(intermediate+"\n") 598 f.write(secretKey+"\n") 599 600 subprocess.check_call(["service", "dachs", "restart", 601 "--if-unloaded-within=300"])
602
603 604 -def main():
605 plainui.SemiStingyPlainUI(base.ui) 606 base.IS_DACHS_SERVER = True 607 args = makeCLIParser(globals()).parse_args() 608 args.subAction(args)
609 610 611 if __name__=="__main__": 612 main() 613