This file is indexed.

/usr/lib/python2.7/dist-packages/traits/util/tests/test_async_trait_wait.py is in python-traits 4.6.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import random
import threading
import time
import unittest

from traits.api import Enum, HasStrictTraits

from traits.util.async_trait_wait import wait_for_condition


class TrafficLights(HasStrictTraits):
    colour = Enum('Green', 'Amber', 'Red', 'RedAndAmber')

    _next_colour = {
        'Green': 'Amber',
        'Amber': 'Red',
        'Red': 'RedAndAmber',
        'RedAndAmber': 'Green',
    }

    def make_random_changes(self, change_count):
        for _ in xrange(change_count):
            time.sleep(random.uniform(0.1, 0.3))
            self.colour = self._next_colour[self.colour]


class TestAsyncTraitWait(unittest.TestCase):
    def test_wait_for_condition_success(self):
        lights = TrafficLights(colour='Green')
        t = threading.Thread(target=lights.make_random_changes, args=(2,))
        t.start()

        wait_for_condition(
            condition=lambda l: l.colour == 'Red',
            obj=lights,
            trait='colour',
        )

        self.assertEqual(lights.colour, 'Red')
        t.join()

    def test_wait_for_condition_failure(self):
        lights = TrafficLights(colour='Green')
        t = threading.Thread(target=lights.make_random_changes, args=(2,))
        t.start()

        self.assertRaises(
            RuntimeError,
            wait_for_condition,
            condition=lambda l: l.colour == 'RedAndAmber',
            obj=lights,
            trait='colour',
            timeout=5.0,
            )
        t.join()

    def test_traits_handler_cleaned_up(self):
        # An older version of wait_for_condition failed to clean up
        # the trait handler, leading to possibly evaluation of the
        # condition after the 'wait_for_condition' call had returned.

        self.lights = TrafficLights(colour='Green')
        t = threading.Thread(target=self.lights.make_random_changes, args=(3,))
        t.start()
        wait_for_condition(
            condition=lambda l: self.lights.colour == 'Red',
            obj=self.lights,
            trait='colour',
        )
        del self.lights

        # If the condition gets evaluated again past this point, we'll
        # see an AttributeError from the failed self.lights lookup.

        # assertSucceeds!
        t.join()


if __name__ == '__main__':
    unittest.main()