This file is indexed.

/usr/share/pyshared/weboob/core/scheduler.py is in python-weboob-core 0.g-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-

# Copyright(C) 2010-2011 Romain Bignon, Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.




from threading import Timer, Event, RLock, _Timer
from weboob.tools.log import getLogger
from weboob.tools.misc import get_backtrace


__all__ = ['Scheduler']


class IScheduler(object):
    def schedule(self, interval, function, *args):
        raise NotImplementedError()

    def repeat(self, interval, function, *args):
        raise NotImplementedError()

    def cancel(self, ev):
        raise NotImplementedError()

    def run(self):
        raise NotImplementedError()

    def want_stop(self):
        raise NotImplementedError()


class RepeatedTimer(_Timer):
    def run(self):
        while not self.finished.isSet():
            try:
                self.function(*self.args, **self.kwargs)
            except Exception:
                # do not stop timer because of an exception
                print get_backtrace()
            self.finished.wait(self.interval)
        self.finished.set()


class Scheduler(IScheduler):
    def __init__(self):
        self.logger = getLogger('scheduler')
        self.mutex = RLock()
        self.stop_event = Event()
        self.count = 0
        self.queue = {}

    def schedule(self, interval, function, *args):
        return self._schedule(Timer, interval, self._schedule_callback, function, *args)

    def repeat(self, interval, function, *args):
        return self._schedule(RepeatedTimer, interval, self._repeat_callback, function, *args)

    def _schedule(self, klass, interval, meta_func, function, *args):
        if self.stop_event.isSet():
            return

        with self.mutex:
            self.count += 1
            self.logger.debug('function "%s" will be called in %s seconds' % (function.__name__, interval))
            timer = klass(interval, meta_func, (self.count, interval, function, args))
            self.queue[self.count] = timer
            timer.start()
            return self.count

    def _schedule_callback(self, count, interval, function, args):
        with self.mutex:
            self.queue.pop(count)
        return function(*args)

    def _repeat_callback(self, count, interval, function, args):
        function(*args)
        with self.mutex:
            try:
                e = self.queue[count]
            except KeyError:
                return
            else:
                self.logger.debug('function "%s" will be called in %s seconds' % (function.__name__, e.interval))

    def cancel(self, ev):
        with self.mutex:
            try:
                e = self.queue.pop(ev)
            except KeyError:
                return False
            e.cancel()
            self.logger.debug('scheduled function "%s" is canceled' % e.function.__name__)
            return True

    def _wait_to_stop(self):
        self.want_stop()
        with self.mutex:
            for e in self.queue.itervalues():
                e.cancel()
                e.join()
            self.queue = {}

    def run(self):
        try:
            while True:
                self.stop_event.wait(0.1)
        except KeyboardInterrupt:
            self._wait_to_stop()
            raise
        else:
            self._wait_to_stop()
        return True

    def want_stop(self):
        self.stop_event.set()
        with self.mutex:
            for t in self.queue.itervalues():
                t.cancel()
                # Contrary to _wait_to_stop(), don't call t.join
                # because want_stop() have to be non-blocking.
            self.queue = {}