This file is indexed.

/usr/lib/python2.7/dist-packages/ooni/managers.py is in ooniprobe 1.3.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import itertools

from ooni.utils import log
from ooni.settings import config


def makeIterable(item):
    """
    Takes as argument or an iterable and if it's not an iterable object then it
    will return a listiterator.
    """
    try:
        iterable = iter(item)
    except TypeError:
        iterable = iter([item])
    return iterable


class TaskManager(object):
    retries = 2
    concurrency = 10

    def __init__(self):
        self._tasks = iter(())
        self._active_tasks = []
        self.failures = 0

    def _failed(self, failure, task):
        """
        The has failed to complete, we append it to the end of the task chain
        to be re-run once all the currently scheduled tasks have run.
        """
        log.debug("Task %s has failed %s times" % (task, task.failures))
        if config.advanced.debug:
            log.exception(failure)

        self._active_tasks.remove(task)
        self.failures = self.failures + 1

        if task.failures <= self.retries:
            log.debug("Rescheduling...")
            self._tasks = itertools.chain(makeIterable(task), self._tasks)

        else:
            # This fires the errback when the task is done but has failed.
            log.debug('Permanent failure for %s' % task)
            task.done.errback(failure)

        self._fillSlots()

        self.failed(failure, task)

    def _fillSlots(self):
        """
        Called on test completion and schedules measurements to be run for the
        available slots.
        """
        for _ in range(self.availableSlots):
            try:
                task = self._tasks.next()
                self._run(task)
            except StopIteration:
                break
            except ValueError:
                # XXX this is a workaround the race condition that leads the
                # _tasks generator to throw the exception
                # ValueError: generator already called.
                continue

    def _run(self, task):
        """
        This gets called to add a task to the list of currently active and
        running tasks.
        """
        self._active_tasks.append(task)

        d = task.start()
        d.addCallback(self._succeeded, task)
        d.addErrback(self._failed, task)

    def _succeeded(self, result, task):
        """
        We have successfully completed a measurement.
        """
        self._active_tasks.remove(task)

        # Fires the done deferred when the task has completed
        task.done.callback(result)

        self._fillSlots()

        self.succeeded(result, task)

    @property
    def failedMeasurements(self):
        return self.failures

    @property
    def availableSlots(self):
        """
        Returns the number of available slots for running tests.
        """
        return self.concurrency - len(self._active_tasks)

    def schedule(self, task_or_task_iterator):
        """
        Takes as argument a single task or a task iterable and appends it to
        the task generator queue.
        """
        log.debug("Starting this task %s" % repr(task_or_task_iterator))

        iterable = makeIterable(task_or_task_iterator)

        self._tasks = itertools.chain(self._tasks, iterable)
        self._fillSlots()

    def start(self):
        """
        This is called to start the task manager.
        """
        self.failures = 0

        self._fillSlots()

    def failed(self, failure, task):
        """
        This hoook is called every time a task has failed.

        The default failure handling logic is to reschedule the task up until
        we reach the maximum number of retries.
        """
        raise NotImplemented

    def succeeded(self, result, task):
        """
        This hook is called every time a task has been successfully executed.
        """
        raise NotImplemented


class LinkedTaskManager(TaskManager):

    def __init__(self):
        super(LinkedTaskManager, self).__init__()
        self.child = None
        self.parent = None

    @property
    def availableSlots(self):
        mySlots = self.concurrency - len(self._active_tasks)
        if self.child:
            s = self.child.availableSlots
            return min(s, mySlots)
        return mySlots

    def _succeeded(self, result, task):
        super(LinkedTaskManager, self)._succeeded(result, task)
        if self.parent:
            self.parent._fillSlots()

    def _failed(self, result, task):
        super(LinkedTaskManager, self)._failed(result, task)
        if self.parent:
            self.parent._fillSlots()


class MeasurementManager(LinkedTaskManager):

    """
    This is the Measurement Tracker. In here we keep track of active
    measurements and issue new measurements once the active ones have been
    completed.

    MeasurementTracker does not keep track of the typology of measurements that
    it is running. It just considers a measurement something that has an input
    and a method to be called.

    NetTest on the contrary is aware of the typology of measurements that it is
    dispatching as they are logically grouped by test file.
    """

    def __init__(self):
        if config.advanced.measurement_retries:
            self.retries = config.advanced.measurement_retries
        if config.advanced.measurement_concurrency:
            self.concurrency = config.advanced.measurement_concurrency
        super(MeasurementManager, self).__init__()

    def succeeded(self, result, measurement):
        log.debug("Successfully performed measurement %s" % measurement)
        log.debug("%s" % result)

    def failed(self, failure, measurement):
        pass


class ReportEntryManager(LinkedTaskManager):

    def __init__(self):
        if config.advanced.reporting_retries:
            self.retries = config.advanced.reporting_retries
        if config.advanced.reporting_concurrency:
            self.concurrency = config.advanced.reporting_concurrency
        super(ReportEntryManager, self).__init__()

    def succeeded(self, result, task):
        log.debug("Successfully performed report %s" % task)
        log.debug(str(result))

    def failed(self, failure, task):
        pass