This file is indexed.

/usr/share/pyshared/juju/lib/twistutils.py is in juju-0.7 0.7-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import inspect
import os

from twisted.internet import reactor
from twisted.internet.defer import (
    Deferred, maybeDeferred, succeed, DeferredList)
from twisted.python.util import mergeFunctionMetadata


def concurrent_execution_guard(attribute):
    """Sets attribute to True/False during execution of the decorated method.

    Used to ensure non concurrent execution of the decorated function via
    an instance attribute. *The underlying function must return a defered*.
    """

    def guard(f):

        def guard_execute(self, *args, **kw):
            value = getattr(self, attribute, None)
            if value:
                return succeed(False)
            else:
                setattr(self, attribute, True)

            d = maybeDeferred(f, self, *args, **kw)

            def post_execute(result):
                setattr(self, attribute, False)
                return result
            d.addBoth(post_execute)
            return d

        return mergeFunctionMetadata(f, guard_execute)

    return guard


def gather_results(deferreds, consume_errors=True):
    d = DeferredList(deferreds, fireOnOneErrback=1,
                     consumeErrors=consume_errors)
    d.addCallback(lambda r: [x[1] for x in r])
    d.addErrback(lambda f: f.value.subFailure)
    return d


def get_module_directory(module):
    """Determine the directory of a module.

    Trial rearranges the working directory such that the module
    paths are relative to a modified current working directory,
    which results in failing tests when run under coverage, we
    manually remove the trial locations to ensure correct
    directories are utilized.
    """
    return os.path.abspath(os.path.dirname(inspect.getabsfile(module)).replace(
        "/_trial_temp", ""))


def sleep(delay):
    """Non-blocking sleep.

    :param int delay: time in seconds to sleep.
    :return: a Deferred that fires after the desired delay.
    :rtype: :class:`twisted.internet.defer.Deferred`
    """
    deferred = Deferred()
    reactor.callLater(delay, deferred.callback, None)
    return deferred