1 """
2 A cron-like facility to regularly run some functions.
3
4 Most of the apparatus in here is not really for user consumption.
5 There's a singleton of the queue created below, and the methods of that
6 singleton are exposed as module-level functions.
7
8 To make the jobs actually execute, the running program has to call
9 registerSchedulerFunction(schedulerFunction). Only the first
10 registration is relevant. The schedulerFunction has the signature
11 sf(delay, callable) and has to arrange for callable to be called delay
12 seconds in the future; twisted's reactor.callLater works like this.
13
14 However, you should arrange for previous callLaters to be canceled when
15 a new one comes in. There is no management to make sure only one
16 queue reaper runs at any time (it doesn't hurt much if more than one
17 run, but it's a waste of resources).
18 """
19
20
21
22
23
24
25
26 import calendar
27 import datetime
28 import heapq
29 import sys
30 import time
31 import threading
32 import traceback
33
34 from gavo import utils
35 from gavo.base import config
36 from gavo.base import osinter
37
38
40 """tries to send a mail to the configured administrator.
41
42 This relies on a functional mail infrastructure on the local host.
43 """
44 if not config.get("maintainerAddress"):
45 utils.sendUIEvent("Error", "Wanted to send mail with subject '%s', but no"
46 " maintainerAddress is given"%subject)
47 return
48
49 osinter.sendMail(
50 "\n".join(["To: "+config.get("maintainerAddress"),
51 "Subject: "+subject,
52 "From: DaCHS server <%s>"%config.get("maintainerAddress"),
53 "Content-Type: text/plain",
54 "",
55 utils.safe_str(message)]))
56
57
58
60 """A job run in a queue.
61
62 These have a name and a run() method; use their reportCronFailure(message)
63 method to deliver error messages (of course, you can also just log;
64 reportCronFailure will in typically send a mail). Concrete jobs
65 have to implement a getNextWakeupTime(gmtime) -> gmtime method;
66 they probably have to redefine __init__; the must up-call.
67 """
68
69 lastStarted = None
70
72 self.name = name
73 self.callable = callable
74
78
85
87 """runs callable under somewhat reliable circumstances.
88 """
89 try:
90 self.callable()
91 except Exception:
92 utils.sendUIEvent("Error",
93 "Failure in timed job %s. Trying to send maintainer a mail."%
94 utils.safe_str(self))
95 self.reportCronFailure("".join(
96 traceback.format_exception(*sys.exc_info())))
97
99 """returns the UTC unix epoch seconds when this job is next
100 supposed to run, starting from curTime.
101 """
102 raise NotImplementedError(
103 "You must override AbstractJob.getNextWakeupTime()")
104
105
107 """A job that's executed roughly every interval seconds.
108
109 interval can be negative, in which case the job is scheduled for (almost)
110 immediate execution.
111 """
112 - def __init__(self, interval, name, callable):
115
117
118
119 if self.interval<0:
120 self.interval = -self.interval
121 return curTime
122
123 if self.lastStarted is None:
124 return curTime+self.interval/10
125 else:
126 return curTime+self.interval
127
128
130 """A job that's run roughly daily at some wallclock (UTC) times.
131
132 times is a list of (day-of-month, day-of-week, hour, minute) tuples.
133 day-of-month and/or day-of-week are 1-based and may be None (if both
134 are non-none, day-of-week wins).
135 """
136 - def __init__(self, times, name, callable):
139
141
142
143
144 nextWakeups = []
145 baseDT = datetime.datetime.utcfromtimestamp(curTime)
146
147 for dom, dow, hour, minute in self.times:
148 if dow is not None:
149 dayDiff = dow-baseDT.isoweekday()
150 if dayDiff<0:
151 dayDiff += 7
152 curDT = baseDT+datetime.timedelta(days=dayDiff)
153 curDT = curDT.replace(hour=hour, minute=minute)
154 if curDT<=baseDT:
155 curDT = curDT+datetime.timedelta(days=7)
156
157 elif dom is not None:
158 curDT = baseDT.replace(day=dom, hour=hour, minute=minute)
159 if curDT<=baseDT:
160 if curDT.month<12:
161 curDT = curDT.replace(month=curDT.month+1)
162 else:
163 curDT = curDT.replace(year=curDT.year+1, month=1)
164
165 else:
166 curDT = baseDT.replace(hour=hour, minute=minute)
167 if curDT<=baseDT:
168 curDT = curDT+datetime.timedelta(hours=24)
169
170 nextWakeups.append(calendar.timegm(curDT.utctimetuple()))
171
172 return min(nextWakeups)
173
174
176 """A cron-job queue.
177
178 This is really a heap sorted by the time the job is next supposed to run.
179 """
181 self.jobs = []
182 self.lock = threading.Lock()
183 self.scheduleFunction = None
184
186 """adds job to the queue and reschedules the wakeup if necessary.
187
188 Since this method does not check for the presence of like-named jobs,
189 it must be used for rescheduling exclusively. To schedule new jobs,
190 use _scheduleJob.
191 """
192 with self.lock:
193 heapq.heappush(self.jobs, (job.getNextWakeupTime(time.time()), job))
194 self._scheduleWakeup()
195
197 """adds job to the job list.
198
199 This is basically like _rescheduleJob, except that this method makes
200 sure that any other job with the same name is removed.
201 """
202 lastStarted = self._unscheduleForName(job.name)
203 job.lastStarted = lastStarted
204 self._rescheduleJob(job)
205
207 """removes all jobs named name from the job queue.
208 """
209 toRemove = []
210 with self.lock:
211 for index, (_, job) in enumerate(self.jobs):
212 if job.name==name:
213 toRemove.append(index)
214 if not toRemove:
215 return None
216
217 toRemove.reverse()
218 retval = self.jobs[toRemove[0]][1].lastStarted
219 for index in toRemove:
220 self.jobs.pop(index)
221 heapq.heapify(self.jobs)
222 return retval
223
225 """takes the next job off of the job queue and runs it.
226
227 If the wakeup time of the next job is too far in the future,
228 this does essentially nothing.
229 """
230 with self.lock:
231 if not self.jobs:
232 return
233 if self.jobs[0][0]>time.time()+0.1:
234
235 return self._scheduleWakeup()
236
237 jobTime, job = heapq.heappop(self.jobs)
238
239 job.lastStarted = time.time()
240 self._rescheduleJob(job)
241 job.run()
242
244 """makes the toplevel scheduler wake queue processing up when the
245 next job is due.
246 """
247 if not self.jobs:
248
249 return
250 nextWakeup = self.jobs[0][0]
251 if self.scheduleFunction is not None:
252 self.scheduleFunction(max(1, nextWakeup-time.time()),
253 self._runNextJob)
254
255 - def runEvery(self, seconds, name, callable):
256 """schedules callable to be run every seconds.
257
258 name must be a unique identifier for the "job". jobs with identical
259 names overwrite each other.
260
261 callable will be run in the main thread, so it must finish quickly
262 or it will block the server.
263 """
264 job = IntervalJob(seconds, name, callable)
265 self._scheduleJob(job)
266 return job
267
268 - def repeatAt(self, times, name, callable):
269 """schedules callable to be run every day at times.
270
271 times is a list of (day-of-month, day-of-week, hour, minute) tuples.
272 day-of-month and/or day-of-week are 1-based and may be None (if both
273 are non-none, day-of-week wins).
274
275 name must be a unique identifier for the "job". jobs with identical
276 names overwrite each other.
277
278 callable will be run in the main thread, so it must finish quickly
279 or it will block the server.
280 """
281 job = TimedJob(times, name, callable)
282 self._scheduleJob(job)
283 return job
284
286 if self.scheduleFunction is None:
287 self.scheduleFunction = scheduleFunction
288 self._scheduleWakeup()
289
291 self.scheduleFunction = None
292
294 """returns a sequence of (startDateTime local, job name) pairs.
295
296 This is for inspection/debug purposes.
297 """
298 with self.lock:
299 schedule = self.jobs[:]
300 return [(datetime.datetime.fromtimestamp(jobTime), job.name)
301 for jobTime, job in schedule]
302
303
304 _queue = Queue()
305 runEvery = _queue.runEvery
306 repeatAt = _queue.repeatAt
307 registerScheduleFunction = _queue.registerScheduleFunction
308 clearScheduleFunction = _queue.clearScheduleFunction
309