Package gavo :: Package base :: Module cron
[frames] | no frames]

Source Code for Module gavo.base.cron

  1  """ 
  2  A cron-like facility to regularly run some functions. 
  3   
  4  Most of the apparatus in here is not really for user consumption. 
  5  There's a singleton of the queue created below, and the methods of that 
  6  singleton are exposed as module-level functions. 
  7   
  8  To make the jobs actually execute, the running program has to call  
  9  registerSchedulerFunction(schedulerFunction).  Only the first  
 10  registration is relevant.  The schedulerFunction has the signature  
 11  sf(delay, callable) and has to arrange for callable to be called delay  
 12  seconds in the future; twisted's reactor.callLater works like this. 
 13   
 14  However, you should arrange for previous callLaters to be canceled when  
 15  a new one comes in.  There is no management to make sure only one 
 16  queue reaper runs at any time (it doesn't hurt much if more than one 
 17  run, but it's a waste of resources). 
 18  """ 
 19   
 20  #c Copyright 2008-2019, the GAVO project 
 21  #c 
 22  #c This program is free software, covered by the GNU GPL.  See the 
 23  #c COPYING file in the source distribution. 
 24   
 25   
 26  import calendar 
 27  import datetime 
 28  import heapq 
 29  import sys 
 30  import time 
 31  import threading 
 32  import traceback 
 33   
 34  from gavo import utils 
 35  from gavo.base import config 
 36  from gavo.base import osinter 
 37   
 38   
39 -def sendMailToAdmin(subject, message):
40 """tries to send a mail to the configured administrator. 41 42 This relies on a functional mail infrastructure on the local host. 43 """ 44 if not config.get("maintainerAddress"): 45 utils.sendUIEvent("Error", "Wanted to send mail with subject '%s', but no" 46 " maintainerAddress is given"%subject) 47 return 48 49 osinter.sendMail( 50 "\n".join(["To: "+config.get("maintainerAddress"), 51 "Subject: "+subject, 52 "From: DaCHS server <%s>"%config.get("maintainerAddress"), 53 "Content-Type: text/plain", 54 "", 55 utils.safe_str(message)]))
56 57 58
59 -class AbstractJob(object):
60 """A job run in a queue. 61 62 These have a name and a run() method; use their reportCronFailure(message) 63 method to deliver error messages (of course, you can also just log; 64 reportCronFailure will in typically send a mail). Concrete jobs 65 have to implement a getNextWakeupTime(gmtime) -> gmtime method; 66 they probably have to redefine __init__; the must up-call. 67 """ 68 # here, Queue keeps track of the last time this job was started. 69 lastStarted = None 70
71 - def __init__(self, name, callable):
72 self.name = name 73 self.callable = callable
74
75 - def __str__(self):
76 return "<%s %s, last run at %s>"%( 77 self.__class__.__name__, self.name, self.lastStarted)
78
79 - def reportCronFailure(self, message):
80 sendMailToAdmin("DaCHS %s job failed"%self.name, 81 "\n".join([ 82 "DaCHS job %s failed"%utils.safe_str(self), 83 "\nDetails:\n", 84 message]))
85
86 - def run(self):
87 """runs callable under somewhat reliable circumstances. 88 """ 89 try: 90 self.callable() 91 except Exception: 92 utils.sendUIEvent("Error", 93 "Failure in timed job %s. Trying to send maintainer a mail."% 94 utils.safe_str(self)) 95 self.reportCronFailure("".join( 96 traceback.format_exception(*sys.exc_info())))
97
98 - def getNextWakeupTime(self, curTime):
99 """returns the UTC unix epoch seconds when this job is next 100 supposed to run, starting from curTime. 101 """ 102 raise NotImplementedError( 103 "You must override AbstractJob.getNextWakeupTime()")
104 105
106 -class IntervalJob(AbstractJob):
107 """A job that's executed roughly every interval seconds. 108 109 interval can be negative, in which case the job is scheduled for (almost) 110 immediate execution. 111 """
112 - def __init__(self, interval, name, callable):
113 self.interval = interval 114 AbstractJob.__init__(self, name, callable)
115
116 - def getNextWakeupTime(self, curTime):
117 # special behaviour for the first call with a negative interval: 118 # fix the interval and run immediately. 119 if self.interval<0: 120 self.interval = -self.interval 121 return curTime 122 123 if self.lastStarted is None: 124 return curTime+self.interval/10 125 else: 126 return curTime+self.interval
127 128
129 -class TimedJob(AbstractJob):
130 """A job that's run roughly daily at some wallclock (UTC) times. 131 132 times is a list of (day-of-month, day-of-week, hour, minute) tuples. 133 day-of-month and/or day-of-week are 1-based and may be None (if both 134 are non-none, day-of-week wins). 135 """
136 - def __init__(self, times, name, callable):
137 self.times = times 138 AbstractJob.__init__(self, name, callable)
139
140 - def getNextWakeupTime(self, curTime):
141 # dumb strategy: get parts, replace hour and minute, and if it's 142 # in the past, add a day; do that for all recurrence times, and use 143 # the smallest one. 144 nextWakeups = [] 145 baseDT = datetime.datetime.utcfromtimestamp(curTime) 146 147 for dom, dow, hour, minute in self.times: 148 if dow is not None: 149 dayDiff = dow-baseDT.isoweekday() 150 if dayDiff<0: 151 dayDiff += 7 152 curDT = baseDT+datetime.timedelta(days=dayDiff) 153 curDT = curDT.replace(hour=hour, minute=minute) 154 if curDT<=baseDT: 155 curDT = curDT+datetime.timedelta(days=7) 156 157 elif dom is not None: 158 curDT = baseDT.replace(day=dom, hour=hour, minute=minute) 159 if curDT<=baseDT: 160 if curDT.month<12: 161 curDT = curDT.replace(month=curDT.month+1) 162 else: 163 curDT = curDT.replace(year=curDT.year+1, month=1) 164 165 else: 166 curDT = baseDT.replace(hour=hour, minute=minute) 167 if curDT<=baseDT: 168 curDT = curDT+datetime.timedelta(hours=24) 169 170 nextWakeups.append(calendar.timegm(curDT.utctimetuple())) 171 172 return min(nextWakeups)
173 174
175 -class Queue(object):
176 """A cron-job queue. 177 178 This is really a heap sorted by the time the job is next supposed to run. 179 """
180 - def __init__(self):
181 self.jobs = [] 182 self.lock = threading.Lock() 183 self.scheduleFunction = None
184
185 - def _rescheduleJob(self, job):
186 """adds job to the queue and reschedules the wakeup if necessary. 187 188 Since this method does not check for the presence of like-named jobs, 189 it must be used for rescheduling exclusively. To schedule new jobs, 190 use _scheduleJob. 191 """ 192 with self.lock: 193 heapq.heappush(self.jobs, (job.getNextWakeupTime(time.time()), job)) 194 self._scheduleWakeup()
195
196 - def _scheduleJob(self, job):
197 """adds job to the job list. 198 199 This is basically like _rescheduleJob, except that this method makes 200 sure that any other job with the same name is removed. 201 """ 202 lastStarted = self._unscheduleForName(job.name) 203 job.lastStarted = lastStarted 204 self._rescheduleJob(job)
205
206 - def _unscheduleForName(self, name):
207 """removes all jobs named name from the job queue. 208 """ 209 toRemove = [] 210 with self.lock: 211 for index, (_, job) in enumerate(self.jobs): 212 if job.name==name: 213 toRemove.append(index) 214 if not toRemove: 215 return None 216 217 toRemove.reverse() 218 retval = self.jobs[toRemove[0]][1].lastStarted 219 for index in toRemove: 220 self.jobs.pop(index) 221 heapq.heapify(self.jobs) 222 return retval
223
224 - def _runNextJob(self):
225 """takes the next job off of the job queue and runs it. 226 227 If the wakeup time of the next job is too far in the future, 228 this does essentially nothing. 229 """ 230 with self.lock: 231 if not self.jobs: 232 return 233 if self.jobs[0][0]>time.time()+0.1: 234 # don't do anything on spurious wakeups 235 return self._scheduleWakeup() 236 237 jobTime, job = heapq.heappop(self.jobs) 238 239 job.lastStarted = time.time() 240 self._rescheduleJob(job) 241 job.run()
242
243 - def _scheduleWakeup(self):
244 """makes the toplevel scheduler wake queue processing up when the 245 next job is due. 246 """ 247 if not self.jobs: 248 # Nothing to run; we'll be called when someone schedules something 249 return 250 nextWakeup = self.jobs[0][0] 251 if self.scheduleFunction is not None: 252 self.scheduleFunction(max(1, nextWakeup-time.time()), 253 self._runNextJob)
254
255 - def runEvery(self, seconds, name, callable):
256 """schedules callable to be run every seconds. 257 258 name must be a unique identifier for the "job". jobs with identical 259 names overwrite each other. 260 261 callable will be run in the main thread, so it must finish quickly 262 or it will block the server. 263 """ 264 job = IntervalJob(seconds, name, callable) 265 self._scheduleJob(job) 266 return job
267
268 - def repeatAt(self, times, name, callable):
269 """schedules callable to be run every day at times. 270 271 times is a list of (day-of-month, day-of-week, hour, minute) tuples. 272 day-of-month and/or day-of-week are 1-based and may be None (if both 273 are non-none, day-of-week wins). 274 275 name must be a unique identifier for the "job". jobs with identical 276 names overwrite each other. 277 278 callable will be run in the main thread, so it must finish quickly 279 or it will block the server. 280 """ 281 job = TimedJob(times, name, callable) 282 self._scheduleJob(job) 283 return job
284
285 - def registerScheduleFunction(self, scheduleFunction):
286 if self.scheduleFunction is None: 287 self.scheduleFunction = scheduleFunction 288 self._scheduleWakeup()
289
290 - def clearScheduleFunction(self):
291 self.scheduleFunction = None
292
293 - def getQueueRepr(self):
294 """returns a sequence of (startDateTime local, job name) pairs. 295 296 This is for inspection/debug purposes. 297 """ 298 with self.lock: 299 schedule = self.jobs[:] 300 return [(datetime.datetime.fromtimestamp(jobTime), job.name) 301 for jobTime, job in schedule]
302 303 304 _queue = Queue() 305 runEvery = _queue.runEvery 306 repeatAt = _queue.repeatAt 307 registerScheduleFunction = _queue.registerScheduleFunction 308 clearScheduleFunction = _queue.clearScheduleFunction 309