Package gavo :: Package rscdef :: Module executing
[frames] | no frames]

Source Code for Module gavo.rscdef.executing

  1  """ 
  2  The execute element and related stuff. 
  3  """ 
  4   
  5  #c Copyright 2008-2019, the GAVO project 
  6  #c 
  7  #c This program is free software, covered by the GNU GPL.  See the 
  8  #c COPYING file in the source distribution. 
  9   
 10   
 11  import os 
 12  import re 
 13  import subprocess 
 14  import sys 
 15  import threading 
 16  import traceback 
 17   
 18  from gavo import base 
 19  from gavo.base import cron 
 20  from gavo.rscdef import common 
 21  from gavo.rscdef import procdef 
 22   
 23   
24 -class GuardedFunctionFactory(object):
25 """a class for making functions safe for cron-like executions. 26 27 The main method is makeGuarded. It introduces a lock protecting against 28 double execution (if that were to happen, the execution is suppressed with a 29 warning; of course, if you fork something into the background, that mechanism 30 no longer works). The stuff is run in a thread, and exceptions caught. If 31 anything goes wrong during execution, a mail is sent to the administrator. 32 33 Note that, in contrast to cron, I/O is not captured (that would 34 be difficult for threads; we don't want processes because of 35 the potential trouble with database connections). 36 37 There's a module-private instance of this that's used by Execute. 38 """
39 - def __init__(self):
40 self.threadsCurrentlyActive = [] 41 self.activeListLock = threading.Lock()
42
43 - def _reapOldThreads(self):
44 if len(self.threadsCurrentlyActive)>10: 45 base.ui.notifyWarning("There's a suspicious number of cron" 46 " threads active (%d). You should check what's going on."% 47 len(self.threadsCurrentlyActive)) 48 49 newThreads = [] 50 with self.activeListLock: 51 for t in self.threadsCurrentlyActive: 52 if t.isAlive(): 53 newThreads.append(t) 54 else: 55 t.join(timeout=0.001) 56 self.threadsCurrentlyActive = newThreads
57
58 - def makeGuardedThreaded(self, callable, execDef):
59 """returns callable ready for safe cron-like execution. 60 61 execDef is an Execute instance. 62 """ 63 serializingLock = threading.Lock() 64 65 def innerFunction(): 66 try: 67 try: 68 execDef.outputAccum = [] 69 callable(execDef.rd, execDef) 70 except Exception: 71 base.ui.notifyError("Uncaught exception in timed job %s." 72 " Trying to send traceback to the maintainer."%execDef.jobName) 73 cron.sendMailToAdmin("DaCHS Job %s failed"%execDef.jobName, 74 "".join(traceback.format_exception(*sys.exc_info()))) 75 finally: 76 serializingLock.release() 77 if execDef.debug and execDef.outputAccum: 78 cron.sendMailToAdmin("Debug output of DaCHS Job %s"%execDef.jobName, 79 "\n".join(execDef.outputAccum)) 80 del execDef.outputAccum
81 82 def cronFunction(): 83 self._reapOldThreads() 84 if not serializingLock.acquire(False): 85 base.ui.notifyWarning("Timed job %s has not finished" 86 " before next instance came around"%execDef.jobName) 87 return 88 t = threading.Thread(name=execDef.title, target=innerFunction) 89 base.ui.notifyInfo("Spawning thread for cron job %s"%execDef.title) 90 t.daemon = True 91 t.start() 92 93 with self.activeListLock: 94 self.threadsCurrentlyActive.append(t) 95 96 return t
97 98 return cronFunction 99 100 _guardedFunctionFactory = GuardedFunctionFactory() 101 102
103 -class CronJob(procdef.ProcApp):
104 """Python code for use within execute. 105 106 The resource descriptor this runs at is available as rd, the execute 107 definition (having such attributes as title, job, plus any 108 properties given in the RD) as execDef. 109 110 Note that no I/O capturing takes place (that's impossible since in 111 general the jobs run within the server). To have actual cron jobs, 112 use ``execDef.spawn(["cmd", "arg1"...])``. This will send a mail on failed 113 execution and also raise a ReportableError in that case. 114 115 In the frequent use case of a resdir-relative python program, you 116 can use the ``execDef.spawnPython(modulePath)`` function. 117 118 If you must stay within the server process, you can do something like:: 119 120 mod, _ = utils.loadPythonModule(rd.getAbsPath("bin/coverageplot.py")) 121 mod.makePlot() 122 123 -- in that way, your code can sit safely within the resource directory 124 and you still don't have to manipulate the module path. 125 """ 126 name_ = "job" 127 formalArgs = "rd, execDef"
128 129
130 -class Execute(base.Structure, base.ExpansionDelegator):
131 """a container for calling code. 132 133 This is a cron-like functionality. The jobs are run in separate 134 threads, so they need to be thread-safe with respect to the 135 rest of DaCHS. DaCHS serializes calls, though, so that your 136 code should never run twice at the same time. 137 138 At least on CPython, you must make sure your code does not 139 block with the GIL held; this is still in the server process. 140 If you do daring things, fork off (note that you must not use 141 any database connections you may have after forking, which means 142 you can't safely use the RD passed in). See the docs on `Element job`_. 143 144 Then testing/debugging such code, use ``gavo admin execute rd#id`` 145 to immediately run the jobs. 146 """ 147 name_ = "execute" 148 149 _title = base.UnicodeAttribute("title", 150 default = base.Undefined, 151 description="Some descriptive title for the job; this is used" 152 " in diagnostics.", 153 copyable=False,) 154 155 _at = base.StringListAttribute("at", 156 description="One or more hour:minute pairs at which to run" 157 " the code each day. This conflicts with every. Optionally," 158 " you can prefix each time by one of m<dom> or w<dow> for" 159 " jobs only to be exectued at some day of the month or week, both" 160 " counted from 1. So, 'm22 7:30, w3 15:02' would execute on" 161 " the 22nd of each month at 7:30 UTC and on every wednesday at 15:02.", 162 default=base.NotGiven, 163 copyable=True,) 164 165 _every = base.IntAttribute("every", 166 default=base.NotGiven, 167 description="Run the job roughly every this many seconds." 168 " This conflicts with at. Note that the first execution of" 169 " such a job is after every/10 seconds, and that the timers" 170 " start anew at every server restart. So, if you restart" 171 " often, these jobs may run much more frequently or not at all" 172 " if the interval is large. If every is smaller than zero, the" 173 " job will be executed immediately when the RD is being loaded and is" 174 " then run every abs(every) seconds", 175 copyable=True,) 176 177 _job = base.StructAttribute("job", 178 childFactory=CronJob, 179 default=base.Undefined, 180 description="The code to run.", 181 copyable=True,) 182 183 _debug = base.BooleanAttribute("debug", 184 description="If true, on execution of external processes (span or" 185 " spawnPython), the output will be accumulated and mailed to" 186 " the administrator. Note that output of the actual cron job" 187 " itself is not caught (it might turn up in serverStderr)." 188 " You could use execDef.outputAccum.append(<stuff>) to have" 189 " information from within the code included.", default=False) 190 191 _properties = base.PropertyAttribute() 192 193 _rd = common.RDAttribute() 194
195 - def spawn(self, cliList):
196 """spawns an external command, capturing the output and mailing it 197 to the admin if it failed. 198 199 Output is buffered and mailed, so it shouldn't be too large. 200 201 This does not raise an exception if it failed (in normal usage, 202 this would cause two mails to be sent). Instead, it returns the 203 returncode of the spawned process; if that's 0, you're ok. But 204 in general, you wouldn't want to check it. 205 """ 206 p = subprocess.Popen(cliList, 207 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 208 stderr=subprocess.STDOUT, close_fds=True) 209 childOutput, _ = p.communicate() 210 if p.returncode: 211 cron.sendMailToAdmin("A process spawned by %s failed with %s"%( 212 self.title, p.returncode), 213 "Output of %s:\n\n%s"%(cliList, childOutput)) 214 215 elif self.debug: 216 if childOutput: 217 self.outputAccum.append("\n\n%s -> %s\n"%(cliList, p.returncode)) 218 self.outputAccum.append(childOutput) 219 220 return p.returncode
221
222 - def spawnPython(self, pythonFile):
223 """spawns a new python interpreter executing pythonFile. 224 225 pythonFile may be resdir-relative. 226 """ 227 self.spawn(["python", os.path.join(self.rd.resdir, pythonFile)])
228
229 - def _parseAt(self, atSpec, ctx):
230 """returns a tuple ready for cron.repeatAt from atSpec. 231 232 see the at StringListAttribute for how it would look like; this 233 parses one element of that string list. 234 """ 235 mat = re.match(r"(?P<dow>w\d\s+)?" 236 r"(?P<dom>m\d\d?\s+)?" 237 r"(?P<hr>\d+):(?P<min>\d+)", atSpec) 238 if not mat: 239 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint= 240 "This is hour:minute optionally prefixed by either w<weekday> or"\ 241 " m<day of month>, each counted from 1.") 242 243 hour, minute = int(mat.group("hr")), int(mat.group("min")) 244 if not (0<=hour<=23 and 0<=minute<=59): 245 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint= 246 "This must be hour:minute with 0<=hour<=23 or 0<=minute<=59") 247 248 dom = None 249 if mat.group("dom"): 250 dom = int(mat.group("dom")[1:]) 251 if not 1<=dom<=28: 252 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint= 253 "day-of-month in at must be between 1 and 28.") 254 255 dow = None 256 if mat.group("dow"): 257 dow = int(mat.group("dow")[1:]) 258 if not 1<=dow<=7: 259 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint= 260 "day-of-week in at must be between 1 and 7.") 261 262 return (dom, dow, hour, minute)
263
264 - def completeElement(self, ctx):
265 self._completeElementNext(Execute, ctx) 266 if len([s for s in [self.at, self.every] if s is base.NotGiven])!=1: 267 raise base.StructureError("Exactly one of at and every required" 268 " for Execute", pos=ctx.pos) 269 270 if self.at is not base.NotGiven: 271 self.parsedAt = [] 272 for literal in self.at: 273 self.parsedAt.append(self._parseAt(literal, ctx))
274
275 - def onElementComplete(self):
276 self._onElementCompleteNext(Execute) 277 278 self.jobName = "%s in %s"%(self.title, self.rd.sourceId) 279 280 self.callable = _guardedFunctionFactory.makeGuardedThreaded( 281 self.job.compile(), self) 282 283 if self.at is not base.NotGiven: 284 cron.repeatAt(self.parsedAt, self.jobName, self.callable) 285 else: 286 cron.runEvery(self.every, self.jobName, self.callable)
287