/usr/lib/python2.7/dist-packages/traits/util/tests/test_record_containers.py is in python-traits 4.6.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | #----------------------------------------------------------------------------
# Copyright (c) 2014, Enthought, Inc.
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in /LICENSE.txt and may be redistributed only
# under the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
#
#----------------------------------------------------------------------------
import os
import shutil
import tempfile
import threading
import unittest
from traits.util.event_tracer import (
SentinelRecord, RecordContainer, MultiThreadRecordContainer)
class TestRecordContainers(unittest.TestCase):
def setUp(self):
self.directory = tempfile.mkdtemp()
self.filename = os.path.join(self.directory, 'myfile')
def tearDown(self):
shutil.rmtree(self.directory)
def test_record_container(self):
container = RecordContainer()
# add records
for i in range(7):
container.record(SentinelRecord())
self.assertEqual(len(container._records), 7)
# save records
container.save_to_file(self.filename)
with open(self.filename, 'r') as handle:
lines = handle.readlines()
self.assertEqual(lines, ['\n'] * 7)
def test_multi_thread_record_container(self):
container = MultiThreadRecordContainer()
def record(container):
thread = threading.current_thread().name
collector = container.get_change_event_collector(thread)
collector.record(SentinelRecord())
thread_1 = threading.Thread(target=record, args=(container,))
thread_2 = threading.Thread(target=record, args=(container,))
thread_1.start()
thread_2.start()
record(container)
thread_2.join()
thread_1.join()
self.assertEqual(len(container._record_containers), 3)
for collector in container._record_containers.itervalues():
self.assertTrue(
isinstance(collector._records[0], SentinelRecord))
self.assertEqual(len(collector._records), 1)
# save records
container.save_to_directory(self.directory)
for name in container._record_containers:
filename = os.path.join(self.directory, '{0}.trace'.format(name))
with open(filename, 'r') as handle:
lines = handle.readlines()
self.assertEqual(lines, ['\n'])
if __name__ == '__main__':
unittest.main()
|