/usr/share/pyshared/zc/lockfile/tests.py is in python-zc.lockfile 1.0.2-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | ##############################################################################
#
# Copyright (c) 2004 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing import setupstack
import os, sys, unittest, doctest
import zc.lockfile, time, threading
def inc():
while 1:
try:
lock = zc.lockfile.LockFile('f.lock')
except zc.lockfile.LockError:
continue
else:
break
f = open('f', 'r+b')
v = int(f.readline().strip())
time.sleep(0.01)
v += 1
f.seek(0)
f.write('%d\n' % v)
f.close()
lock.close()
def many_threads_read_and_write():
r"""
>>> open('f', 'w+b').write('0\n')
>>> open('f.lock', 'w+b').write('0\n')
>>> n = 50
>>> threads = [threading.Thread(target=inc) for i in range(n)]
>>> _ = [thread.start() for thread in threads]
>>> _ = [thread.join() for thread in threads]
>>> saved = int(open('f', 'rb').readline().strip())
>>> saved == n
True
>>> os.remove('f')
We should only have one pid in the lock file:
>>> f = open('f.lock')
>>> len(f.read().strip().split())
1
>>> f.close()
>>> os.remove('f.lock')
"""
def pid_in_lockfile():
r"""
>>> import os, zc.lockfile
>>> pid = os.getpid()
>>> lock = zc.lockfile.LockFile("f.lock")
>>> f = open("f.lock")
>>> f.seek(1)
>>> f.read().strip() == str(pid)
True
>>> f.close()
Make sure that locking twice does not overwrite the old pid:
>>> lock = zc.lockfile.LockFile("f.lock")
Traceback (most recent call last):
...
LockError: Couldn't lock 'f.lock'
>>> f = open("f.lock")
>>> f.seek(1)
>>> f.read().strip() == str(pid)
True
>>> f.close()
>>> lock.close()
"""
def test_suite():
suite = unittest.TestSuite()
suite.addTest(doctest.DocFileSuite(
'README.txt',
setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown))
suite.addTest(doctest.DocTestSuite(
setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown))
return suite
|