/usr/lib/python2.7/dist-packages/ooni/managers.py is in ooniprobe 1.3.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | import itertools
from ooni.utils import log
from ooni.settings import config
def makeIterable(item):
"""
Takes as argument or an iterable and if it's not an iterable object then it
will return a listiterator.
"""
try:
iterable = iter(item)
except TypeError:
iterable = iter([item])
return iterable
class TaskManager(object):
retries = 2
concurrency = 10
def __init__(self):
self._tasks = iter(())
self._active_tasks = []
self.failures = 0
def _failed(self, failure, task):
"""
The has failed to complete, we append it to the end of the task chain
to be re-run once all the currently scheduled tasks have run.
"""
log.debug("Task %s has failed %s times" % (task, task.failures))
if config.advanced.debug:
log.exception(failure)
self._active_tasks.remove(task)
self.failures = self.failures + 1
if task.failures <= self.retries:
log.debug("Rescheduling...")
self._tasks = itertools.chain(makeIterable(task), self._tasks)
else:
# This fires the errback when the task is done but has failed.
log.debug('Permanent failure for %s' % task)
task.done.errback(failure)
self._fillSlots()
self.failed(failure, task)
def _fillSlots(self):
"""
Called on test completion and schedules measurements to be run for the
available slots.
"""
for _ in range(self.availableSlots):
try:
task = self._tasks.next()
self._run(task)
except StopIteration:
break
except ValueError:
# XXX this is a workaround the race condition that leads the
# _tasks generator to throw the exception
# ValueError: generator already called.
continue
def _run(self, task):
"""
This gets called to add a task to the list of currently active and
running tasks.
"""
self._active_tasks.append(task)
d = task.start()
d.addCallback(self._succeeded, task)
d.addErrback(self._failed, task)
def _succeeded(self, result, task):
"""
We have successfully completed a measurement.
"""
self._active_tasks.remove(task)
# Fires the done deferred when the task has completed
task.done.callback(result)
self._fillSlots()
self.succeeded(result, task)
@property
def failedMeasurements(self):
return self.failures
@property
def availableSlots(self):
"""
Returns the number of available slots for running tests.
"""
return self.concurrency - len(self._active_tasks)
def schedule(self, task_or_task_iterator):
"""
Takes as argument a single task or a task iterable and appends it to
the task generator queue.
"""
log.debug("Starting this task %s" % repr(task_or_task_iterator))
iterable = makeIterable(task_or_task_iterator)
self._tasks = itertools.chain(self._tasks, iterable)
self._fillSlots()
def start(self):
"""
This is called to start the task manager.
"""
self.failures = 0
self._fillSlots()
def failed(self, failure, task):
"""
This hoook is called every time a task has failed.
The default failure handling logic is to reschedule the task up until
we reach the maximum number of retries.
"""
raise NotImplemented
def succeeded(self, result, task):
"""
This hook is called every time a task has been successfully executed.
"""
raise NotImplemented
class LinkedTaskManager(TaskManager):
def __init__(self):
super(LinkedTaskManager, self).__init__()
self.child = None
self.parent = None
@property
def availableSlots(self):
mySlots = self.concurrency - len(self._active_tasks)
if self.child:
s = self.child.availableSlots
return min(s, mySlots)
return mySlots
def _succeeded(self, result, task):
super(LinkedTaskManager, self)._succeeded(result, task)
if self.parent:
self.parent._fillSlots()
def _failed(self, result, task):
super(LinkedTaskManager, self)._failed(result, task)
if self.parent:
self.parent._fillSlots()
class MeasurementManager(LinkedTaskManager):
"""
This is the Measurement Tracker. In here we keep track of active
measurements and issue new measurements once the active ones have been
completed.
MeasurementTracker does not keep track of the typology of measurements that
it is running. It just considers a measurement something that has an input
and a method to be called.
NetTest on the contrary is aware of the typology of measurements that it is
dispatching as they are logically grouped by test file.
"""
def __init__(self):
if config.advanced.measurement_retries:
self.retries = config.advanced.measurement_retries
if config.advanced.measurement_concurrency:
self.concurrency = config.advanced.measurement_concurrency
super(MeasurementManager, self).__init__()
def succeeded(self, result, measurement):
log.debug("Successfully performed measurement %s" % measurement)
log.debug("%s" % result)
def failed(self, failure, measurement):
pass
class ReportEntryManager(LinkedTaskManager):
def __init__(self):
if config.advanced.reporting_retries:
self.retries = config.advanced.reporting_retries
if config.advanced.reporting_concurrency:
self.concurrency = config.advanced.reporting_concurrency
super(ReportEntryManager, self).__init__()
def succeeded(self, result, task):
log.debug("Successfully performed report %s" % task)
log.debug(str(result))
def failed(self, failure, task):
pass
|