/usr/share/pyshared/weboob/core/scheduler.py is in python-weboob-core 0.g-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon, Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from threading import Timer, Event, RLock, _Timer
from weboob.tools.log import getLogger
from weboob.tools.misc import get_backtrace
__all__ = ['Scheduler']
class IScheduler(object):
def schedule(self, interval, function, *args):
raise NotImplementedError()
def repeat(self, interval, function, *args):
raise NotImplementedError()
def cancel(self, ev):
raise NotImplementedError()
def run(self):
raise NotImplementedError()
def want_stop(self):
raise NotImplementedError()
class RepeatedTimer(_Timer):
def run(self):
while not self.finished.isSet():
try:
self.function(*self.args, **self.kwargs)
except Exception:
# do not stop timer because of an exception
print get_backtrace()
self.finished.wait(self.interval)
self.finished.set()
class Scheduler(IScheduler):
def __init__(self):
self.logger = getLogger('scheduler')
self.mutex = RLock()
self.stop_event = Event()
self.count = 0
self.queue = {}
def schedule(self, interval, function, *args):
return self._schedule(Timer, interval, self._schedule_callback, function, *args)
def repeat(self, interval, function, *args):
return self._schedule(RepeatedTimer, interval, self._repeat_callback, function, *args)
def _schedule(self, klass, interval, meta_func, function, *args):
if self.stop_event.isSet():
return
with self.mutex:
self.count += 1
self.logger.debug('function "%s" will be called in %s seconds' % (function.__name__, interval))
timer = klass(interval, meta_func, (self.count, interval, function, args))
self.queue[self.count] = timer
timer.start()
return self.count
def _schedule_callback(self, count, interval, function, args):
with self.mutex:
self.queue.pop(count)
return function(*args)
def _repeat_callback(self, count, interval, function, args):
function(*args)
with self.mutex:
try:
e = self.queue[count]
except KeyError:
return
else:
self.logger.debug('function "%s" will be called in %s seconds' % (function.__name__, e.interval))
def cancel(self, ev):
with self.mutex:
try:
e = self.queue.pop(ev)
except KeyError:
return False
e.cancel()
self.logger.debug('scheduled function "%s" is canceled' % e.function.__name__)
return True
def _wait_to_stop(self):
self.want_stop()
with self.mutex:
for e in self.queue.itervalues():
e.cancel()
e.join()
self.queue = {}
def run(self):
try:
while True:
self.stop_event.wait(0.1)
except KeyboardInterrupt:
self._wait_to_stop()
raise
else:
self._wait_to_stop()
return True
def want_stop(self):
self.stop_event.set()
with self.mutex:
for t in self.queue.itervalues():
t.cancel()
# Contrary to _wait_to_stop(), don't call t.join
# because want_stop() have to be non-blocking.
self.queue = {}
|