/usr/share/pyshared/tftp/util.py is in python-txtftp 0.1~bzr38-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | '''
@author: shylent
'''
from functools import wraps
from twisted.internet import reactor
from twisted.internet.defer import maybeDeferred
__all__ = ['SequentialCall', 'Spent', 'Cancelled', 'deferred']
class Spent(Exception):
"""Trying to iterate a L{SequentialCall}, that is exhausted"""
class Cancelled(Exception):
"""Trying to iterate a L{SequentialCall}, that's been cancelled"""
def no_op(*args, **kwargs):
pass
class SequentialCall(object):
"""Calls a given callable at intervals, specified by the L{timeout} iterable.
Optionally calls a timeout handler, if provided, when there are no more timeout values.
@param timeout: an iterable, that yields valid _seconds arguments to
L{callLater<twisted.internet.interfaces.IReactorTime.callLater>} (floats)
@type timeout: any iterable
@param run_now: whether or not the callable should be called immediately
upon initialization. Relinquishes control to the reactor
(calls callLater(0,...)). Default: C{False}.
@type run_now: C{bool}
@param callable: the callable, that will be called at the specified intervals
@param callable_args: the arguments to call it with
@param callable_kwargs: the keyword arguments to call it with
@param on_timeout: the callable, that will be called when there are no more
timeout values
@param on_timeout_args: the arguments to call it with
@param on_timeout_kwargs: the keyword arguments to call it with
"""
@classmethod
def run(cls, timeout, callable, callable_args=None, callable_kwargs=None,
on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
run_now=False, _clock=None):
"""Create a L{SequentialCall} object and start its scheduler cycle
@see: L{SequentialCall}
"""
inst = cls(timeout, callable, callable_args, callable_kwargs,
on_timeout, on_timeout_args, on_timeout_kwargs,
run_now, _clock)
inst.reschedule()
return inst
def __init__(self, timeout,
callable, callable_args=None, callable_kwargs=None,
on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
run_now=False, _clock=None):
self._timeout = iter(timeout)
self.callable = callable
self.callable_args = callable_args or []
self.callable_kwargs = callable_kwargs or {}
self.on_timeout = on_timeout or no_op
self.on_timeout_args = on_timeout_args or []
self.on_timeout_kwargs = on_timeout_kwargs or {}
self._wd = None
self._spent = self._cancelled = False
self._ran_first = not run_now
if _clock is None:
self._clock = reactor
else:
self._clock = _clock
def _call_and_schedule(self):
self.callable(*self.callable_args, **self.callable_kwargs)
self._ran_first = True
if not self._spent:
self.reschedule()
def reschedule(self):
"""Schedule the next L{callable} call
@raise Spent: if the timeout iterator has been exhausted and on_timeout
handler has been already called
@raise Cancelled: if this L{SequentialCall} has already been cancelled
"""
if not self._ran_first:
self._wd = self._clock.callLater(0, self._call_and_schedule)
return
if self._cancelled:
raise Cancelled("This SequentialCall has already been cancelled")
if self._spent:
raise Spent("This SequentialCall has already timed out")
try:
next_timeout = self._timeout.next()
self._wd = self._clock.callLater(next_timeout, self._call_and_schedule)
except StopIteration:
self.on_timeout(*self.on_timeout_args, **self.on_timeout_kwargs)
self._spent = True
def cancel(self):
"""Cancel the next scheduled call
@raise Cancelled: if this SequentialCall has already been cancelled
@raise Spent: if this SequentialCall has expired
"""
if self._cancelled:
raise Cancelled("This SequentialCall has already been cancelled")
if self._spent:
raise Spent("This SequentialCall has already timed out")
if self._wd is not None and self._wd.active():
self._wd.cancel()
self._spent = self._cancelled = True
def active(self):
"""Whether or not this L{SequentialCall} object is considered active"""
return not (self._spent or self._cancelled)
def deferred(func):
"""Decorates a function to ensure that it always returns a `Deferred`.
This also serves a secondary documentation purpose; functions decorated
with this are readily identifiable as asynchronous.
"""
@wraps(func)
def wrapper(*args, **kwargs):
return maybeDeferred(func, *args, **kwargs)
return wrapper
|