This file is indexed.

/usr/lib/python2.7/dist-packages/traits/tests/test_listeners.py is in python-traits 4.6.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#  Test the 'add_trait_listener', 'remove_trait_listener' interface to
#  the HasTraits class.
#
#  Written by: David C. Morrill
#
#  Date: 09/07/2005
#
#  (c) Copyright 2005 by Enthought, Inc.
#
#  Copyright (c) 2007, Enthought, Inc.
#  All rights reserved.
#
#  This software is provided without warranty under the terms of the BSD
#  License included in /LICENSE.txt and may be redistributed only under the
#  conditions described in the aforementioned license.  The license is also
#  available online at http://www.enthought.com/licenses/BSD.txt
#
#  Thanks for using Enthought open source!

from __future__ import absolute_import

import contextlib
import cStringIO
import sys
import threading
import time

from traits.testing.unittest_tools import unittest

from ..api import HasTraits, Str, Int, Float, Any, Event
from ..api import push_exception_handler, pop_exception_handler


@contextlib.contextmanager
def captured_stderr():
    """
    Return a context manager that directs all stderr output to a string.

    """
    new_stderr = cStringIO.StringIO()
    original_stderr = sys.stderr
    sys.stderr = new_stderr
    try:
        yield new_stderr
    finally:
        sys.stderr = original_stderr


class GenerateEvents(HasTraits):
    name = Str
    age = Int
    weight = Float

events = {}  # dict of events


class ListenEvents(HasTraits):

    #  'GenerateEvents' event interface:
    #  the events are stored in the dict 'events'

    def _name_changed(self, object, name, old, new):
        events["_name_changed"] = (name, old, new)

    def _age_changed(self, object, name, old, new):
        events["_age_changed"] = (name, old, new)

    def _weight_changed(self, object, name, old, new):
        events["_weight_changed"] = (name, old, new)

    def alt_name_changed(self, object, name, old, new):
        events["alt_name_changed"] = (name, old, new)

    def alt_weight_changed(self, object, name, old, new):
        events["alt_weight_changed"] = (name, old, new)


class GenerateFailingEvents(HasTraits):
    name = Str
    def _name_changed(self):
        raise RuntimeError

class Test_Listeners(unittest.TestCase):

    def test(self):
        global events

        # FIXME: comparing floats
        ge = GenerateEvents()
        le = ListenEvents()

        # Starting test: No Listeners
        ge.trait_set(name='Joe', age=22, weight=152.0)

        # Adding default listener
        ge.add_trait_listener(le)
        events = {}
        ge.trait_set(name='Mike', age=34, weight=178.0)
        self.assertEqual(events, {
            '_age_changed': ('age', 22, 34),
            '_weight_changed': ('weight', 152.0, 178.0),
            '_name_changed': ('name', 'Joe', 'Mike'),
            })

        # Adding alternate listener
        ge.add_trait_listener(le, 'alt')
        events = {}
        ge.trait_set(name='Gertrude', age=39, weight=108.0)
        self.assertEqual(events, {
            '_age_changed': ('age', 34, 39),
            '_name_changed': ('name', 'Mike', 'Gertrude'),
            '_weight_changed': ('weight', 178.0, 108.0),
            'alt_name_changed': ('name', 'Mike', 'Gertrude'),
            'alt_weight_changed': ('weight', 178.0, 108.0),
            })

        # Removing default listener
        ge.remove_trait_listener(le)
        events = {}
        ge.trait_set(name='Sally', age=46, weight=118.0)
        self.assertEqual(events, {
            'alt_name_changed': ('name', 'Gertrude', 'Sally'),
            'alt_weight_changed': ('weight', 108.0, 118.0),
            })

        # Removing alternate listener
        ge.remove_trait_listener(le, 'alt')
        events = {}
        ge.trait_set(name='Ralph', age=29, weight=198.0)
        self.assertEqual(events, {})

    def test_trait_exception_handler_can_access_exception(self):
        """ Tests if trait exception handlers can access the traceback of the exception.
        """
        import traceback

        from traits import trait_notifiers
        def _handle_exception(obj,name,old,new):
            self.assertIsNotNone(sys.exc_info()[0])
        ge = GenerateFailingEvents()
        try:
            trait_notifiers.push_exception_handler(
                _handle_exception,
                reraise_exceptions=False,
                main=True
            )
            ge.trait_set(name='John Cleese')
        finally:
            trait_notifiers.pop_exception_handler()

class A(HasTraits):
    exception = Any

    foo = Event

    def foo_changed_handler(self):
        pass


def foo_writer(a, stop_event):
    while not stop_event.is_set():
        try:
            a.foo = True
        except Exception as e:
            a.exception = e


class TestRaceCondition(unittest.TestCase):
    def setUp(self):
        push_exception_handler(
            handler=lambda *args: None,
            reraise_exceptions=True,
            main=True,
            )

    def tearDown(self):
        pop_exception_handler()

    def test_listener_thread_safety(self):
        # Regression test for GitHub issue #56
        a = A()
        stop_event = threading.Event()

        t = threading.Thread(target=foo_writer, args=(a, stop_event))
        t.start()

        for _ in xrange(100):
            a.on_trait_change(a.foo_changed_handler, 'foo')
            time.sleep(0.0001)  # encourage thread-switch
            a.on_trait_change(a.foo_changed_handler, 'foo', remove=True)

        stop_event.set()
        t.join()

        self.assertTrue(a.exception is None)

    def test_listener_deleted_race(self):
        # Regression test for exception that occurred when the listener_deleted
        # method is called after the dispose method on a
        # TraitsChangeNotifyWrapper.
        class SlowListener(HasTraits):
            def handle_age_change(self):
                time.sleep(1.0)

        def worker_thread(event_source, start_event):
            # Wait until the listener is set up on the main thread, then fire
            # the event.
            start_event.wait()
            event_source.age = 11

        def main_thread(event_source, start_event):
            listener = SlowListener()
            event_source.on_trait_change(listener.handle_age_change, 'age')
            start_event.set()
            # Allow time to make sure that we're in the middle of handling an
            # event.
            time.sleep(0.5)
            event_source.on_trait_change(
                listener.handle_age_change, 'age', remove=True)

        # Previously, a ValueError would be raised on the worker thread
        # during (normal refcount-based) garbage collection.  That
        # ValueError is ignored by the Python system, so the only
        # visible effect is the output to stderr.
        with captured_stderr() as s:
            start_event = threading.Event()
            event_source = GenerateEvents(age=10)
            t = threading.Thread(
                target=worker_thread,
                args=(event_source, start_event),
                )
            t.start()
            main_thread(event_source, start_event)
            t.join()

        self.assertNotIn('Exception', s.getvalue())


# Run the unit tests (if invoked from the command line):
if __name__ == '__main__':
    unittest.main()