/usr/share/pyshared/juju/lib/twistutils.py is in juju-0.7 0.7-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | import inspect
import os
from twisted.internet import reactor
from twisted.internet.defer import (
Deferred, maybeDeferred, succeed, DeferredList)
from twisted.python.util import mergeFunctionMetadata
def concurrent_execution_guard(attribute):
"""Sets attribute to True/False during execution of the decorated method.
Used to ensure non concurrent execution of the decorated function via
an instance attribute. *The underlying function must return a defered*.
"""
def guard(f):
def guard_execute(self, *args, **kw):
value = getattr(self, attribute, None)
if value:
return succeed(False)
else:
setattr(self, attribute, True)
d = maybeDeferred(f, self, *args, **kw)
def post_execute(result):
setattr(self, attribute, False)
return result
d.addBoth(post_execute)
return d
return mergeFunctionMetadata(f, guard_execute)
return guard
def gather_results(deferreds, consume_errors=True):
d = DeferredList(deferreds, fireOnOneErrback=1,
consumeErrors=consume_errors)
d.addCallback(lambda r: [x[1] for x in r])
d.addErrback(lambda f: f.value.subFailure)
return d
def get_module_directory(module):
"""Determine the directory of a module.
Trial rearranges the working directory such that the module
paths are relative to a modified current working directory,
which results in failing tests when run under coverage, we
manually remove the trial locations to ensure correct
directories are utilized.
"""
return os.path.abspath(os.path.dirname(inspect.getabsfile(module)).replace(
"/_trial_temp", ""))
def sleep(delay):
"""Non-blocking sleep.
:param int delay: time in seconds to sleep.
:return: a Deferred that fires after the desired delay.
:rtype: :class:`twisted.internet.defer.Deferred`
"""
deferred = Deferred()
reactor.callLater(delay, deferred.callback, None)
return deferred
|