/usr/lib/python2.7/dist-packages/ooni/tasks.py is in ooniprobe 1.3.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import time
from twisted.internet import defer, reactor
from ooni import errors as e
from ooni.settings import config
from ooni import otime
class BaseTask(object):
_timer = None
_running = None
def __init__(self):
"""
If you want to schedule a task multiple times, remember to create fresh
instances of it.
"""
self.failures = 0
self.startTime = time.time()
self.runtime = 0
# This is a deferred that gets called when a test has reached it's
# final status, this means: all retries have been attempted or the test
# has successfully executed.
# Such deferred will be called on completion by the TaskManager.
self.done = defer.Deferred()
def _failed(self, failure):
self.failures += 1
self.failed(failure)
return failure
def _succeeded(self, result):
self.runtime = time.time() - self.startTime
self.succeeded(result)
return result
def start(self):
self._running = defer.maybeDeferred(self.run)
self._running.addErrback(self._failed)
self._running.addCallback(self._succeeded)
return self._running
def succeeded(self, result):
"""
Place here the logic to handle a successful execution of the task.
"""
pass
def failed(self, failure):
"""
Place in here logic to handle failure.
"""
pass
def run(self):
"""
Override this with the logic of your task.
Must return a deferred.
"""
pass
class TaskWithTimeout(BaseTask):
timeout = 30
# So that we can test the callLater calls
clock = reactor
def _timedOut(self):
"""Internal method for handling timeout failure"""
if self._running:
self._failed(e.TaskTimedOut)
self._running.cancel()
def _cancelTimer(self):
if self._timer.active():
self._timer.cancel()
def _succeeded(self, result):
self._cancelTimer()
return BaseTask._succeeded(self, result)
def _failed(self, failure):
self._cancelTimer()
return BaseTask._failed(self, failure)
def start(self):
self._timer = self.clock.callLater(self.timeout, self._timedOut)
return BaseTask.start(self)
class Measurement(TaskWithTimeout):
def __init__(self, test_instance, test_method, test_input):
"""
test_class:
is the class, subclass of NetTestCase, of the test to be run
test_method:
is a string representing the test method to be called to perform
this measurement
test_input:
is the input to the test
net_test:
a reference to the net_test object such measurement belongs to.
"""
self.testInstance = test_instance
self.testInstance.input = test_input
self.testInstance._setUp()
if not hasattr(self.testInstance, '_start_time'):
self.testInstance._start_time = time.time()
if 'input' not in self.testInstance.report.keys():
self.testInstance.report['input'] = test_input
if 'test_start_time' not in self.testInstance.report.keys():
start_time = otime.epochToUTC(self.testInstance._start_time)
self.testInstance.report['test_start_time'] = start_time
self.testInstance.setUp()
self.netTestMethod = getattr(self.testInstance, test_method)
if 'timeout' in dir(test_instance):
if isinstance(test_instance.timeout, int) or isinstance(test_instance.timeout, float):
# If the test has a timeout option set we set the measurement
# timeout to that value + 8 seconds to give it enough time to
# trigger it's internal timeout before we start trigger the
# measurement timeout.
self.timeout = test_instance.timeout + 8
elif config.advanced.measurement_timeout:
self.timeout = config.advanced.measurement_timeout
TaskWithTimeout.__init__(self)
def succeeded(self, result):
pass
def failed(self, failure):
pass
def run(self):
return self.netTestMethod()
class ReportEntry(TaskWithTimeout):
def __init__(self, reporter, entry):
self.reporter = reporter
self.entry = entry
if config.advanced.reporting_timeout:
self.timeout = config.advanced.reporting_timeout
TaskWithTimeout.__init__(self)
def run(self):
return self.reporter.writeReportEntry(self.entry)
|