This file is indexed.

/usr/lib/python2.7/dist-packages/traits/testing/unittest_tools.py is in python-traits 4.6.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
#------------------------------------------------------------------------------
# Copyright (c) 2005-2013, Enthought, Inc.
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in /LICENSE.txt and may be redistributed only
# under the conditions described in the aforementioned license.  The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
# Thanks for using Enthought open source!
#------------------------------------------------------------------------------
""" Trait assert mixin class to simplify test implementation for Trait
Classes.

"""

import contextlib
import threading
import sys
import warnings

from traits.api import (Any, Event, HasStrictTraits, Instance, Int, List,
        Property, Str)
from traits.util.async_trait_wait import wait_for_condition

# Compatibility layer for Python 2.6: try loading unittest2
from traits import _py2to3
if sys.version_info[:2] == (2, 6):
    import unittest2 as unittest
else:
    import unittest


class _AssertTraitChangesContext(object):
    """ A context manager used to implement the trait change assert methods.

    Attributes
    ----------
    obj : HasTraits
        The HasTraits class instance who's class trait will change.

    xname : str
        The extended trait name of trait changes to listen to.

    count : int, optional
        The expected number of times the event should be fired. When None
        (default value) there is no check for the number of times the
        change event was fired.

    events : list of tuples
        A list with tuple elements containing the arguments of an
        `on_trait_change` event signature (<object>, <name>, <old>, <new>).

    Raises
    ------
    AssertionError :
          When the desired number of trait changed did not take place or when
          `count = None` and no trait change took place.

    """

    def __init__(self, obj, xname, count, test_case):
        """ Initialize the trait change assertion context manager.

        Parameters
        ----------
        obj : HasTraits
            The HasTraits class instance who's class trait will change.

        xname : str
            The extended trait name of trait changes to listen to.

        count : int, optional
            The expected number of times the event should be fired. When None
            (default value) there is no check for the number of times the
            change event was fired.

        test_case : TestCase
            A unittest TestCase where to raise the failureException if
            necessary.

        Notes
        -----
        - Checking if the provided xname corresponds to valid traits in
          the class is not implemented yet.

        """
        self.obj = obj
        self.xname = xname
        self.count = count
        self.event = None
        self.events = []
        self.failureException = test_case.failureException

    def _listener(self, obj, name, old, new):
        """ Dummy trait listener.
        """
        self.event = (obj, name, old, new)
        self.events.append(self.event)

    def __enter__(self):
        """ Bind the trait listener.
        """
        self.obj.on_trait_change(self._listener, self.xname)
        return self

    def __exit__(self, exc_type, exc_value, tb):
        """ Remove the trait listener.
        """
        if exc_type is not None:
            return False

        self.obj.on_trait_change(self._listener, self.xname, remove=True)
        if self.count is not None and len(self.events) != self.count:
            msg = 'Change event for {0} was fired {1} times instead of {2}'
            items = self.xname, len(self.events), self.count
            raise self.failureException(msg.format(*items))
        elif self.count is None and not self.events:
            msg = 'A change event was not fired for: {0}'.format(self.xname)
            raise self.failureException(msg)
        return False


@contextlib.contextmanager
def reverse_assertion(context, msg):
    context.__enter__()
    try:
        yield context
    finally:
        try:
            context.__exit__(None, None, None)
        except AssertionError:
            pass
        else:
            raise context.failureException(msg)


class _TraitsChangeCollector(HasStrictTraits):
    """ Class allowing thread-safe recording of events.
    """
    # The object we're listening to.
    obj = Any

    # The (possibly extended) trait name.
    trait = Str

    # Read-only event count.
    event_count = Property(Int)

    # Event that's triggered when the event count is updated.
    event_count_updated = Event

    # Private list of events.
    events = List(Any)

    # Lock used to allow access to events by multiple threads
    # simultaneously.
    _lock = Instance(threading.Lock, ())

    def start_collecting(self):
        self.obj.on_trait_change(
            self._event_handler,
            self.trait,
        )

    def stop_collecting(self):
        self.obj.on_trait_change(
            self._event_handler,
            self.trait,
            remove=True,
        )

    def _event_handler(self, new):
        with self._lock:
            self.events.append(new)
        self.event_count_updated = True

    def _get_event_count(self):
        """ Traits property getter.

        Thread-safe access to event count.

        """
        with self._lock:
            return len(self.events)


class UnittestTools(object):
    """ Mixin class to augment the unittest.TestCase class with useful trait
    related assert methods.

    """

    def assertTraitChanges(self, obj, trait, count=None, callableObj=None,
                           *args, **kwargs):
        """ Assert an object trait changes a given number of times.

        Assert that the class trait changes exactly `count` times during
        execution of the provided function.

        This method can also be used in a with statement to assert that
        a class trait has changed during the execution of the code inside
        the with statement (similar to the assertRaises method). Please note
        that in that case the context manager returns itself and the user
        can introspect the information of:

        - The last event fired by accessing the ``event`` attribute of the
          returned object.

        - All the fired events by accessing the ``events`` attribute of
          the return object.

        Note that in the case of chained properties (trait 'foo' depends on
        'bar', which in turn depends on 'baz'), the order in which the
        corresponding trait events appear in the ``events`` attribute is
        not well-defined, and may depend on dictionary ordering.

        **Example**::

            class MyClass(HasTraits):
                number = Float(2.0)

            my_class = MyClass()

            with self.assertTraitChangesExactly(my_class, 'number', count=1):
                my_class.number = 3.0

        Parameters
        ----------
        obj : HasTraits
            The HasTraits class instance whose class trait will change.

        trait : str
            The extended trait name of trait changes to listen to.

        count : int or None, optional
            The expected number of times the event should be fired. When None
            (default value) there is no check for the number of times the
            change event was fired.

        callableObj : callable, optional
            A callable object that will trigger the expected trait change.
            When None (default value) a trigger is expected to be called
            under the context manger returned by this method.

        *args :
            List of positional arguments for ``callableObj``

        **kwargs :
            Dict of keyword value pairs to be passed to the ``callableObj``


        Returns
        -------
        context : context manager or None
            If ``callableObj`` is None, an assertion context manager is returned,
            inside of which a trait-change trigger can be invoked. Otherwise,
            the context is used internally with ``callableObj`` as the trigger,
            in which case None is returned.


        Notes
        -----
        - Checking if the provided ``trait`` corresponds to valid traits in
          the class is not implemented yet.
        - Using the functional version of the assert method requires the
          ``count`` argument to be given even if it is None.

        """
        context = _AssertTraitChangesContext(obj, trait, count, self)
        if callableObj is None:
            return context
        with context:
            callableObj(*args, **kwargs)

    def assertTraitDoesNotChange(self, obj, trait, callableObj=None,
                                 *args, **kwargs):
        """ Assert an object trait does not change.

        Assert that the class trait does not change during
        execution of the provided function.

        Parameters
        ----------
        obj : HasTraits
            The HasTraits class instance whose class trait will change.

        trait : str
            The extended trait name of trait changes to listen to.

        callableObj : callable, optional
            A callable object that should not trigger a change in the
            passed trait.  When None (default value) a trigger is expected
            to be called under the context manger returned by this method.

        *args :
            List of positional arguments for ``callableObj``

        **kwargs :
            Dict of keyword value pairs to be passed to the ``callableObj``


        Returns
        -------
        context : context manager or None
            If ``callableObj`` is None, an assertion context manager is returned,
            inside of which a trait-change trigger can be invoked. Otherwise,
            the context is used internally with ``callableObj`` as the trigger,
            in which case None is returned.


        """
        msg = 'A change event was fired for: {0}'.format(trait)
        context = _AssertTraitChangesContext(obj, trait, None, self)
        if callableObj is None:
            return reverse_assertion(context, msg)
        with reverse_assertion(context, msg):
            callableObj(*args, **kwargs)
        return

    def assertMultiTraitChanges(self, objects, traits_modified,
            traits_not_modified):
        """ Assert that traits on multiple objects do or do not change.

        This combines some of the functionality of `assertTraitChanges` and
        `assertTraitDoesNotChange`.

        Parameters
        ----------
        objects : list of HasTraits
            The HasTraits class instances whose traits will change.

        traits_modified : list of str
            The extended trait names of trait expected to change.

        traits_not_modified : list of str
            The extended trait names of traits not expected to change.

        """
        args = []
        for obj in objects:
            for trait in traits_modified:
                args.append(self.assertTraitChanges(obj, trait))
            for trait in traits_not_modified:
                args.append(self.assertTraitDoesNotChange(obj, trait))
        return _py2to3.nested_context_mgrs(*args)

    @contextlib.contextmanager
    def assertTraitChangesAsync(self, obj, trait, count=1, timeout=5.0):
        """ Assert an object trait eventually changes.

        Context manager used to assert that the given trait changes at
        least `count` times within the given timeout, as a result of
        execution of the body of the corresponding with block.

        The trait changes are permitted to occur asynchronously.

        **Example usage**::

            with self.assertTraitChangesAsync(my_object, 'SomeEvent', count=4):
                <do stuff that should cause my_object.SomeEvent to be
                fired at least 4 times within the next 5 seconds>


        Parameters
        ----------
        obj : HasTraits
            The HasTraits class instance whose class trait will change.

        trait : str
            The extended trait name of trait changes to listen to.

        count : int, optional
            The expected number of times the event should be fired.

        timeout : float or None, optional
            The amount of time in seconds to wait for the specified number
            of changes. None can be used to indicate no timeout.

        """
        collector = _TraitsChangeCollector(obj=obj, trait=trait)

        # Pass control to body of the with statement.
        collector.start_collecting()
        try:
            yield collector

            # Wait for the expected number of events to arrive.
            try:
                wait_for_condition(
                    condition=lambda obj: obj.event_count >= count,
                    obj=collector,
                    trait='event_count_updated',
                    timeout=timeout,
                )
            except RuntimeError:
                actual_event_count = collector.event_count
                msg = ("Expected {0} event on {1} to be fired at least {2} "
                       "times, but the event was only fired {3} times "
                       "before timeout ({4} seconds).").format(
                    trait,
                    obj,
                    count,
                    actual_event_count,
                    timeout,
                )
                self.fail(msg)

        finally:
            collector.stop_collecting()

    def assertEventuallyTrue(self, obj, trait, condition, timeout=5.0):
        """ Assert that the given condition is eventually true.

        Parameters
        ----------
        obj : HasTraits
            The HasTraits class instance who's traits will change.

        trait : str
            The extended trait name of trait changes to listen to.

        condition : callable
            A function that will be called when the specified trait
            changes.  This should accept ``obj`` and should return a
            Boolean indicating whether the condition is satisfied or not.

        timeout : float or None, optional
            The amount of time in seconds to wait for the condition to
            become true.  None can be used to indicate no timeout.

        """
        try:
            wait_for_condition(
                condition=condition,
                obj=obj,
                trait=trait,
                timeout=timeout,
            )
        except RuntimeError:
            # Helpful to know whether we timed out because the
            # condition never became true, or because the expected
            # event was never issued.
            condition_at_timeout = condition(obj)
            self.fail(
                "Timed out waiting for condition. "
                "At timeout, condition was {0}.".format(condition_at_timeout))

    @contextlib.contextmanager
    def _catch_warnings(self):
        # Ugly hack copied from the core Python code (see
        # Lib/test/test_support.py) to reset the warnings registry
        # for the module making use of this context manager.
        #
        # Note that this hack is unnecessary in Python 3.4 and later; see
        # http://bugs.python.org/issue4180 for the background.
        registry = sys._getframe(4).f_globals.get('__warningregistry__')
        if registry:
            registry.clear()

        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter('always', DeprecationWarning)
            yield w

    @contextlib.contextmanager
    def assertDeprecated(self):
        """
        Assert that the code inside the with block is deprecated.  Intended
        for testing uses of traits.util.deprecated.deprecated.

        """
        with self._catch_warnings() as w:
            yield w
        self.assertGreater(len(w), 0, msg="Expected a DeprecationWarning, "
                           "but none was issued")

    @contextlib.contextmanager
    def assertNotDeprecated(self):
        """
        Assert that the code inside the with block is not deprecated.  Intended
        for testing uses of traits.util.deprecated.deprecated.

        """
        with self._catch_warnings() as w:
            yield w
        self.assertEqual(len(w), 0, msg="Expected no DeprecationWarning, "
                         "but at least one was issued")