/usr/lib/python2.7/dist-packages/duplicity/asyncscheduler.py is in duplicity 0.7.17-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | # -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
# Copyright 2008 Peter Schuller <peter.schuller@infidyne.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Asynchronous job scheduler, for concurrent execution with minimalistic
dependency guarantees.
"""
import duplicity
from duplicity import log
from duplicity.dup_threading import require_threading
from duplicity.dup_threading import interruptably_wait
from duplicity.dup_threading import async_split
from duplicity.dup_threading import with_lock
thread = duplicity.dup_threading.thread_module()
threading = duplicity.dup_threading.threading_module()
class AsyncScheduler:
"""
Easy-to-use scheduler of function calls to be executed
concurrently. A very simple dependency mechanism exists in the
form of barriers (see insert_barrier()).
Each instance has a concurrency level associated with it. A
concurrency of 0 implies that all tasks will be executed
synchronously when scheduled. A concurrency of 1 indicates that a
task will be executed asynchronously, but never concurrently with
other tasks. Both 0 and 1 guarantee strict ordering among all
tasks (i.e., they will be executed in the order scheduled).
At concurrency levels above 1, the tasks will end up being
executed in an order undetermined except insofar as is enforced by
calls to insert_barrier().
An AsynchScheduler should be created for any independent process;
the scheduler will assume that if any background job fails (raises
an exception), it makes further work moot.
"""
def __init__(self, concurrency):
"""
Create an asynchronous scheduler that executes jobs with the
given level of concurrency.
"""
log.Info("%s: %s" % (self.__class__.__name__,
_("instantiating at concurrency %d") %
(concurrency)))
assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)
self.__failed = False # has at least one task failed so far?
self.__failed_waiter = None # when __failed, the waiter of the first task that failed
self.__concurrency = concurrency
self.__worker_count = 0 # number of active workers
self.__waiter_count = 0 # number of threads waiting to submit work
self.__barrier = False # barrier currently in effect?
self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock
# # for everything, even if the resulting notifyAll():s
# # are not technically efficient.
if concurrency > 0:
require_threading("concurrency > 0 (%d)" % (concurrency,))
def insert_barrier(self):
"""
Proclaim that any tasks scheduled prior to the call to this
method MUST be executed prior to any tasks scheduled after the
call to this method.
The intended use case is that if task B depends on A, a
barrier must be inserted in between to guarantee that A
happens before B.
"""
log.Debug("%s: %s" % (self.__class__.__name__, _("inserting barrier")))
# With concurrency 0 it's a NOOP, and due to the special case in
# task scheduling we do not want to append to the queue (will never
# be popped).
if self.__concurrency > 0:
def _insert_barrier():
self.__barrier = True
with_lock(self.__cv, _insert_barrier)
def schedule_task(self, fn, params):
"""
Schedule the given task (callable, typically function) for
execution. Pass the given parameters to the function when
calling it. Returns a callable which can optionally be used
to wait for the task to complete, either by returning its
return value or by propagating any exception raised by said
task.
This method may block or return immediately, depending on the
configuration and state of the scheduler.
This method may also raise an exception in order to trigger
failures early, if the task (if run synchronously) or a previous
task has already failed.
NOTE: Pay particular attention to the scope in which this is
called. In particular, since it will execute concurrently in
the background, assuming fn is a closure, any variables used
most be properly bound in the closure. This is the reason for
the convenience feature of being able to give parameters to
the call, to avoid having to wrap the call itself in a
function in order to "fixate" variables in, for example, an
enclosing loop.
"""
assert fn is not None
# Note: It is on purpose that we keep track of concurrency in
# the front end and launch threads for each task, rather than
# keep a pool of workers. The overhead is not relevant in the
# situation this will be used, and it removes complexity in
# terms of ensuring the scheduler is garbage collected/shut
# down properly when no longer referenced/needed by calling
# code.
if self.__concurrency == 0:
# special case this to not require any platform support for
# threading at all
log.Info("%s: %s" % (self.__class__.__name__,
_("running task synchronously (asynchronicity disabled)")),
log.InfoCode.synchronous_upload_begin)
return self.__run_synchronously(fn, params)
else:
log.Info("%s: %s" % (self.__class__.__name__,
_("scheduling task for asynchronous execution")),
log.InfoCode.asynchronous_upload_begin)
return self.__run_asynchronously(fn, params)
def wait(self):
"""
Wait for the scheduler to become entirely empty (i.e., all
tasks having run to completion).
IMPORTANT: This is only useful with a single caller scheduling
tasks, such that no call to schedule_task() is currently in
progress or may happen subsequently to the call to wait().
"""
def _wait():
interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
with_lock(self.__cv, _wait)
def __run_synchronously(self, fn, params):
# When running synchronously, we immediately leak any exception raised
# for immediate failure reporting to calling code.
ret = fn(*params)
def _waiter():
return ret
log.Info("%s: %s" % (self.__class__.__name__,
_("task completed successfully")),
log.InfoCode.synchronous_upload_done)
return _waiter
def __run_asynchronously(self, fn, params):
(waiter, caller) = async_split(lambda: fn(*params))
def check_pending_failure():
if self.__failed:
log.Info("%s: %s" % (self.__class__.__name__,
_("a previously scheduled task has failed; "
"propagating the result immediately")),
log.InfoCode.asynchronous_upload_done)
self.__failed_waiter()
raise AssertionError("%s: waiter should have raised an exception; "
"this is a bug" % (self.__class__.__name__,))
def wait_for_and_register_launch():
check_pending_failure() # raise on fail
while self.__worker_count >= self.__concurrency or self.__barrier:
if self.__worker_count == 0:
assert self.__barrier, "barrier should be in effect"
self.__barrier = False
self.__cv.notifyAll()
else:
self.__waiter_count += 1
self.__cv.wait()
self.__waiter_count -= 1
check_pending_failure() # raise on fail
self.__worker_count += 1
log.Debug("%s: %s" % (self.__class__.__name__,
_("active workers = %d") % (self.__worker_count,)))
# simply wait for an OK condition to start, then launch our worker. the worker
# never waits on us, we just wait for them.
with_lock(self.__cv, wait_for_and_register_launch)
self.__start_worker(caller)
return waiter
def __start_worker(self, caller):
"""
Start a new worker.
"""
def trampoline():
try:
self.__execute_caller(caller)
finally:
def complete_worker():
self.__worker_count -= 1
log.Debug("%s: %s" % (self.__class__.__name__,
_("active workers = %d") % (self.__worker_count,)))
self.__cv.notifyAll()
with_lock(self.__cv, complete_worker)
thread.start_new_thread(trampoline, ())
def __execute_caller(self, caller):
# The caller half that we get here will not propagate
# errors back to us, but rather propagate it back to the
# "other half" of the async split.
succeeded, waiter = caller()
if not succeeded:
def _signal_failed():
if not self.__failed:
self.__failed = True
self.__failed_waiter = waiter
self.__cv.notifyAll()
with_lock(self.__cv, _signal_failed)
log.Info("%s: %s" % (self.__class__.__name__,
_("task execution done (success: %s)") % succeeded),
log.InfoCode.asynchronous_upload_done)
|