/usr/lib/python2.7/dist-packages/traits/tests/test_listeners.py is in python-traits 4.6.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 | # Test the 'add_trait_listener', 'remove_trait_listener' interface to
# the HasTraits class.
#
# Written by: David C. Morrill
#
# Date: 09/07/2005
#
# (c) Copyright 2005 by Enthought, Inc.
#
# Copyright (c) 2007, Enthought, Inc.
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# License included in /LICENSE.txt and may be redistributed only under the
# conditions described in the aforementioned license. The license is also
# available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
from __future__ import absolute_import
import contextlib
import cStringIO
import sys
import threading
import time
from traits.testing.unittest_tools import unittest
from ..api import HasTraits, Str, Int, Float, Any, Event
from ..api import push_exception_handler, pop_exception_handler
@contextlib.contextmanager
def captured_stderr():
"""
Return a context manager that directs all stderr output to a string.
"""
new_stderr = cStringIO.StringIO()
original_stderr = sys.stderr
sys.stderr = new_stderr
try:
yield new_stderr
finally:
sys.stderr = original_stderr
class GenerateEvents(HasTraits):
name = Str
age = Int
weight = Float
events = {} # dict of events
class ListenEvents(HasTraits):
# 'GenerateEvents' event interface:
# the events are stored in the dict 'events'
def _name_changed(self, object, name, old, new):
events["_name_changed"] = (name, old, new)
def _age_changed(self, object, name, old, new):
events["_age_changed"] = (name, old, new)
def _weight_changed(self, object, name, old, new):
events["_weight_changed"] = (name, old, new)
def alt_name_changed(self, object, name, old, new):
events["alt_name_changed"] = (name, old, new)
def alt_weight_changed(self, object, name, old, new):
events["alt_weight_changed"] = (name, old, new)
class GenerateFailingEvents(HasTraits):
name = Str
def _name_changed(self):
raise RuntimeError
class Test_Listeners(unittest.TestCase):
def test(self):
global events
# FIXME: comparing floats
ge = GenerateEvents()
le = ListenEvents()
# Starting test: No Listeners
ge.trait_set(name='Joe', age=22, weight=152.0)
# Adding default listener
ge.add_trait_listener(le)
events = {}
ge.trait_set(name='Mike', age=34, weight=178.0)
self.assertEqual(events, {
'_age_changed': ('age', 22, 34),
'_weight_changed': ('weight', 152.0, 178.0),
'_name_changed': ('name', 'Joe', 'Mike'),
})
# Adding alternate listener
ge.add_trait_listener(le, 'alt')
events = {}
ge.trait_set(name='Gertrude', age=39, weight=108.0)
self.assertEqual(events, {
'_age_changed': ('age', 34, 39),
'_name_changed': ('name', 'Mike', 'Gertrude'),
'_weight_changed': ('weight', 178.0, 108.0),
'alt_name_changed': ('name', 'Mike', 'Gertrude'),
'alt_weight_changed': ('weight', 178.0, 108.0),
})
# Removing default listener
ge.remove_trait_listener(le)
events = {}
ge.trait_set(name='Sally', age=46, weight=118.0)
self.assertEqual(events, {
'alt_name_changed': ('name', 'Gertrude', 'Sally'),
'alt_weight_changed': ('weight', 108.0, 118.0),
})
# Removing alternate listener
ge.remove_trait_listener(le, 'alt')
events = {}
ge.trait_set(name='Ralph', age=29, weight=198.0)
self.assertEqual(events, {})
def test_trait_exception_handler_can_access_exception(self):
""" Tests if trait exception handlers can access the traceback of the exception.
"""
import traceback
from traits import trait_notifiers
def _handle_exception(obj,name,old,new):
self.assertIsNotNone(sys.exc_info()[0])
ge = GenerateFailingEvents()
try:
trait_notifiers.push_exception_handler(
_handle_exception,
reraise_exceptions=False,
main=True
)
ge.trait_set(name='John Cleese')
finally:
trait_notifiers.pop_exception_handler()
class A(HasTraits):
exception = Any
foo = Event
def foo_changed_handler(self):
pass
def foo_writer(a, stop_event):
while not stop_event.is_set():
try:
a.foo = True
except Exception as e:
a.exception = e
class TestRaceCondition(unittest.TestCase):
def setUp(self):
push_exception_handler(
handler=lambda *args: None,
reraise_exceptions=True,
main=True,
)
def tearDown(self):
pop_exception_handler()
def test_listener_thread_safety(self):
# Regression test for GitHub issue #56
a = A()
stop_event = threading.Event()
t = threading.Thread(target=foo_writer, args=(a, stop_event))
t.start()
for _ in xrange(100):
a.on_trait_change(a.foo_changed_handler, 'foo')
time.sleep(0.0001) # encourage thread-switch
a.on_trait_change(a.foo_changed_handler, 'foo', remove=True)
stop_event.set()
t.join()
self.assertTrue(a.exception is None)
def test_listener_deleted_race(self):
# Regression test for exception that occurred when the listener_deleted
# method is called after the dispose method on a
# TraitsChangeNotifyWrapper.
class SlowListener(HasTraits):
def handle_age_change(self):
time.sleep(1.0)
def worker_thread(event_source, start_event):
# Wait until the listener is set up on the main thread, then fire
# the event.
start_event.wait()
event_source.age = 11
def main_thread(event_source, start_event):
listener = SlowListener()
event_source.on_trait_change(listener.handle_age_change, 'age')
start_event.set()
# Allow time to make sure that we're in the middle of handling an
# event.
time.sleep(0.5)
event_source.on_trait_change(
listener.handle_age_change, 'age', remove=True)
# Previously, a ValueError would be raised on the worker thread
# during (normal refcount-based) garbage collection. That
# ValueError is ignored by the Python system, so the only
# visible effect is the output to stderr.
with captured_stderr() as s:
start_event = threading.Event()
event_source = GenerateEvents(age=10)
t = threading.Thread(
target=worker_thread,
args=(event_source, start_event),
)
t.start()
main_thread(event_source, start_event)
t.join()
self.assertNotIn('Exception', s.getvalue())
# Run the unit tests (if invoked from the command line):
if __name__ == '__main__':
unittest.main()
|