1 """
2 The execute element and related stuff.
3 """
4
5
6
7
8
9
10
11 import os
12 import re
13 import subprocess
14 import sys
15 import threading
16 import traceback
17
18 from gavo import base
19 from gavo.base import cron
20 from gavo.rscdef import common
21 from gavo.rscdef import procdef
22
23
25 """a class for making functions safe for cron-like executions.
26
27 The main method is makeGuarded. It introduces a lock protecting against
28 double execution (if that were to happen, the execution is suppressed with a
29 warning; of course, if you fork something into the background, that mechanism
30 no longer works). The stuff is run in a thread, and exceptions caught. If
31 anything goes wrong during execution, a mail is sent to the administrator.
32
33 Note that, in contrast to cron, I/O is not captured (that would
34 be difficult for threads; we don't want processes because of
35 the potential trouble with database connections).
36
37 There's a module-private instance of this that's used by Execute.
38 """
40 self.threadsCurrentlyActive = []
41 self.activeListLock = threading.Lock()
42
44 if len(self.threadsCurrentlyActive)>10:
45 base.ui.notifyWarning("There's a suspicious number of cron"
46 " threads active (%d). You should check what's going on."%
47 len(self.threadsCurrentlyActive))
48
49 newThreads = []
50 with self.activeListLock:
51 for t in self.threadsCurrentlyActive:
52 if t.isAlive():
53 newThreads.append(t)
54 else:
55 t.join(timeout=0.001)
56 self.threadsCurrentlyActive = newThreads
57
59 """returns callable ready for safe cron-like execution.
60
61 execDef is an Execute instance.
62 """
63 serializingLock = threading.Lock()
64
65 def innerFunction():
66 try:
67 try:
68 execDef.outputAccum = []
69 callable(execDef.rd, execDef)
70 except Exception:
71 base.ui.notifyError("Uncaught exception in timed job %s."
72 " Trying to send traceback to the maintainer."%execDef.jobName)
73 cron.sendMailToAdmin("DaCHS Job %s failed"%execDef.jobName,
74 "".join(traceback.format_exception(*sys.exc_info())))
75 finally:
76 serializingLock.release()
77 if execDef.debug and execDef.outputAccum:
78 cron.sendMailToAdmin("Debug output of DaCHS Job %s"%execDef.jobName,
79 "\n".join(execDef.outputAccum))
80 del execDef.outputAccum
81
82 def cronFunction():
83 self._reapOldThreads()
84 if not serializingLock.acquire(False):
85 base.ui.notifyWarning("Timed job %s has not finished"
86 " before next instance came around"%execDef.jobName)
87 return
88 t = threading.Thread(name=execDef.title, target=innerFunction)
89 base.ui.notifyInfo("Spawning thread for cron job %s"%execDef.title)
90 t.daemon = True
91 t.start()
92
93 with self.activeListLock:
94 self.threadsCurrentlyActive.append(t)
95
96 return t
97
98 return cronFunction
99
100 _guardedFunctionFactory = GuardedFunctionFactory()
101
102
104 """Python code for use within execute.
105
106 The resource descriptor this runs at is available as rd, the execute
107 definition (having such attributes as title, job, plus any
108 properties given in the RD) as execDef.
109
110 Note that no I/O capturing takes place (that's impossible since in
111 general the jobs run within the server). To have actual cron jobs,
112 use ``execDef.spawn(["cmd", "arg1"...])``. This will send a mail on failed
113 execution and also raise a ReportableError in that case.
114
115 In the frequent use case of a resdir-relative python program, you
116 can use the ``execDef.spawnPython(modulePath)`` function.
117
118 If you must stay within the server process, you can do something like::
119
120 mod, _ = utils.loadPythonModule(rd.getAbsPath("bin/coverageplot.py"))
121 mod.makePlot()
122
123 -- in that way, your code can sit safely within the resource directory
124 and you still don't have to manipulate the module path.
125 """
126 name_ = "job"
127 formalArgs = "rd, execDef"
128
129
130 -class Execute(base.Structure, base.ExpansionDelegator):
131 """a container for calling code.
132
133 This is a cron-like functionality. The jobs are run in separate
134 threads, so they need to be thread-safe with respect to the
135 rest of DaCHS. DaCHS serializes calls, though, so that your
136 code should never run twice at the same time.
137
138 At least on CPython, you must make sure your code does not
139 block with the GIL held; this is still in the server process.
140 If you do daring things, fork off (note that you must not use
141 any database connections you may have after forking, which means
142 you can't safely use the RD passed in). See the docs on `Element job`_.
143
144 Then testing/debugging such code, use ``gavo admin execute rd#id``
145 to immediately run the jobs.
146 """
147 name_ = "execute"
148
149 _title = base.UnicodeAttribute("title",
150 default = base.Undefined,
151 description="Some descriptive title for the job; this is used"
152 " in diagnostics.",
153 copyable=False,)
154
155 _at = base.StringListAttribute("at",
156 description="One or more hour:minute pairs at which to run"
157 " the code each day. This conflicts with every. Optionally,"
158 " you can prefix each time by one of m<dom> or w<dow> for"
159 " jobs only to be exectued at some day of the month or week, both"
160 " counted from 1. So, 'm22 7:30, w3 15:02' would execute on"
161 " the 22nd of each month at 7:30 UTC and on every wednesday at 15:02.",
162 default=base.NotGiven,
163 copyable=True,)
164
165 _every = base.IntAttribute("every",
166 default=base.NotGiven,
167 description="Run the job roughly every this many seconds."
168 " This conflicts with at. Note that the first execution of"
169 " such a job is after every/10 seconds, and that the timers"
170 " start anew at every server restart. So, if you restart"
171 " often, these jobs may run much more frequently or not at all"
172 " if the interval is large. If every is smaller than zero, the"
173 " job will be executed immediately when the RD is being loaded and is"
174 " then run every abs(every) seconds",
175 copyable=True,)
176
177 _job = base.StructAttribute("job",
178 childFactory=CronJob,
179 default=base.Undefined,
180 description="The code to run.",
181 copyable=True,)
182
183 _debug = base.BooleanAttribute("debug",
184 description="If true, on execution of external processes (span or"
185 " spawnPython), the output will be accumulated and mailed to"
186 " the administrator. Note that output of the actual cron job"
187 " itself is not caught (it might turn up in serverStderr)."
188 " You could use execDef.outputAccum.append(<stuff>) to have"
189 " information from within the code included.", default=False)
190
191 _properties = base.PropertyAttribute()
192
193 _rd = common.RDAttribute()
194
195 - def spawn(self, cliList):
196 """spawns an external command, capturing the output and mailing it
197 to the admin if it failed.
198
199 Output is buffered and mailed, so it shouldn't be too large.
200
201 This does not raise an exception if it failed (in normal usage,
202 this would cause two mails to be sent). Instead, it returns the
203 returncode of the spawned process; if that's 0, you're ok. But
204 in general, you wouldn't want to check it.
205 """
206 p = subprocess.Popen(cliList,
207 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
208 stderr=subprocess.STDOUT, close_fds=True)
209 childOutput, _ = p.communicate()
210 if p.returncode:
211 cron.sendMailToAdmin("A process spawned by %s failed with %s"%(
212 self.title, p.returncode),
213 "Output of %s:\n\n%s"%(cliList, childOutput))
214
215 elif self.debug:
216 if childOutput:
217 self.outputAccum.append("\n\n%s -> %s\n"%(cliList, p.returncode))
218 self.outputAccum.append(childOutput)
219
220 return p.returncode
221
223 """spawns a new python interpreter executing pythonFile.
224
225 pythonFile may be resdir-relative.
226 """
227 self.spawn(["python", os.path.join(self.rd.resdir, pythonFile)])
228
230 """returns a tuple ready for cron.repeatAt from atSpec.
231
232 see the at StringListAttribute for how it would look like; this
233 parses one element of that string list.
234 """
235 mat = re.match(r"(?P<dow>w\d\s+)?"
236 r"(?P<dom>m\d\d?\s+)?"
237 r"(?P<hr>\d+):(?P<min>\d+)", atSpec)
238 if not mat:
239 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
240 "This is hour:minute optionally prefixed by either w<weekday> or"\
241 " m<day of month>, each counted from 1.")
242
243 hour, minute = int(mat.group("hr")), int(mat.group("min"))
244 if not (0<=hour<=23 and 0<=minute<=59):
245 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
246 "This must be hour:minute with 0<=hour<=23 or 0<=minute<=59")
247
248 dom = None
249 if mat.group("dom"):
250 dom = int(mat.group("dom")[1:])
251 if not 1<=dom<=28:
252 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
253 "day-of-month in at must be between 1 and 28.")
254
255 dow = None
256 if mat.group("dow"):
257 dow = int(mat.group("dow")[1:])
258 if not 1<=dow<=7:
259 raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
260 "day-of-week in at must be between 1 and 7.")
261
262 return (dom, dow, hour, minute)
263
265 self._completeElementNext(Execute, ctx)
266 if len([s for s in [self.at, self.every] if s is base.NotGiven])!=1:
267 raise base.StructureError("Exactly one of at and every required"
268 " for Execute", pos=ctx.pos)
269
270 if self.at is not base.NotGiven:
271 self.parsedAt = []
272 for literal in self.at:
273 self.parsedAt.append(self._parseAt(literal, ctx))
274
287