/usr/share/pyshared/ZEO/tests/CommitLockTests.py is in python-zodb 1:3.10.5-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | ##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""
import threading
import time
from persistent.TimeStamp import TimeStamp
import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
from ZEO.Exceptions import ClientDisconnected
from ZEO.tests.TestThread import TestThread
ZERO = '\0'*8
class DummyDB:
def invalidate(self, *args, **kwargs):
pass
class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
def __init__(self, test, storage, trans):
self.storage = storage
self.trans = trans
self.ready = threading.Event()
TestThread.__init__(self, test)
def testrun(self):
try:
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
self.myvote()
self.storage.tpc_finish(self.trans)
except ClientDisconnected:
pass
def myvote(self):
# The vote() call is synchronous, which makes it difficult to
# coordinate the action of multiple threads that all call
# vote(). This method sends the vote call, then sets the
# event saying vote was called, then waits for the vote
# response. It digs deep into the implementation of the client.
# This method is a replacement for:
# self.ready.set()
# self.storage.tpc_vote(self.trans)
rpc = self.storage._server.rpc
msgid = rpc._deferred_call('vote', id(self.trans))
self.ready.set()
rpc._deferred_wait(msgid)
self.storage._check_serials()
class CommitLockTests:
NUM_CLIENTS = 5
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a
# single storage. There are a lot of cases to cover.
# The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then
# begin one or more other connections that also want to commit.
# This causes the commit lock code to be exercised. Once the
# other connections are started, the first transaction completes.
def _cleanup(self):
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
self._storages = []
def _start_txn(self):
txn = transaction.Transaction()
self._storage.tpc_begin(txn)
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
return oid, txn
def _begin_threads(self):
# Start a second transaction on a different connection without
# blocking the test thread. Returns only after each thread has
# set it's ready event.
self._storages = []
self._threads = []
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = transaction.Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
self._threads.append(t)
t.start()
t.ready.wait()
# Close one of the connections abnormally to test server response
if i == 0:
storage.close()
else:
self._storages.append((storage, txn))
def _finish_threads(self):
for t in self._threads:
t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# It's hard to find the actual address.
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB())
return new
def _get_timestamp(self):
t = time.time()
t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
return `t`
class CommitLockVoteTests(CommitLockTests):
def checkCommitLockVoteFinish(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
self._storage.tpc_finish(txn)
self._storage.load(oid, '')
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockVoteAbort(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
self._storage.tpc_abort(txn)
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockVoteClose(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
self._storage.close()
self._finish_threads()
self._cleanup()
|