This file is indexed.

/usr/share/pyshared/ZEO/tests/CommitLockTests.py is in python-zodb 1:3.10.5-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""

import threading
import time

from persistent.TimeStamp import TimeStamp
import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO

import ZEO.ClientStorage
from ZEO.Exceptions import ClientDisconnected
from ZEO.tests.TestThread import TestThread

ZERO = '\0'*8

class DummyDB:
    def invalidate(self, *args, **kwargs):
        pass

class WorkerThread(TestThread):

    # run the entire test in a thread so that the blocking call for
    # tpc_vote() doesn't hang the test suite.

    def __init__(self, test, storage, trans):
        self.storage = storage
        self.trans = trans
        self.ready = threading.Event()
        TestThread.__init__(self, test)

    def testrun(self):
        try:
            self.storage.tpc_begin(self.trans)
            oid = self.storage.new_oid()
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
            oid = self.storage.new_oid()
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
            self.myvote()
            self.storage.tpc_finish(self.trans)
        except ClientDisconnected:
            pass

    def myvote(self):
        # The vote() call is synchronous, which makes it difficult to
        # coordinate the action of multiple threads that all call
        # vote().  This method sends the vote call, then sets the
        # event saying vote was called, then waits for the vote
        # response.  It digs deep into the implementation of the client.

        # This method is a replacement for:
        #     self.ready.set()
        #     self.storage.tpc_vote(self.trans)

        rpc = self.storage._server.rpc
        msgid = rpc._deferred_call('vote', id(self.trans))
        self.ready.set()
        rpc._deferred_wait(msgid)
        self.storage._check_serials()

class CommitLockTests:

    NUM_CLIENTS = 5

    # The commit lock tests verify that the storage successfully
    # blocks and restarts transactions when there is contention for a
    # single storage.  There are a lot of cases to cover.

    # The general flow of these tests is to start a transaction by
    # getting far enough into 2PC to acquire the commit lock.  Then
    # begin one or more other connections that also want to commit.
    # This causes the commit lock code to be exercised.  Once the
    # other connections are started, the first transaction completes.

    def _cleanup(self):
        for store, trans in self._storages:
            store.tpc_abort(trans)
            store.close()
        self._storages = []

    def _start_txn(self):
        txn = transaction.Transaction()
        self._storage.tpc_begin(txn)
        oid = self._storage.new_oid()
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
        return oid, txn

    def _begin_threads(self):
        # Start a second transaction on a different connection without
        # blocking the test thread.  Returns only after each thread has
        # set it's ready event.
        self._storages = []
        self._threads = []

        for i in range(self.NUM_CLIENTS):
            storage = self._duplicate_client()
            txn = transaction.Transaction()
            tid = self._get_timestamp()

            t = WorkerThread(self, storage, txn)
            self._threads.append(t)
            t.start()
            t.ready.wait()

            # Close one of the connections abnormally to test server response
            if i == 0:
                storage.close()
            else:
                self._storages.append((storage, txn))

    def _finish_threads(self):
        for t in self._threads:
            t.cleanup()

    def _duplicate_client(self):
        "Open another ClientStorage to the same server."
        # It's hard to find the actual address.
        # The rpc mgr addr attribute is a list.  Each element in the
        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
        # address.
        addr = self._storage._addr
        new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
        new.registerDB(DummyDB())
        return new

    def _get_timestamp(self):
        t = time.time()
        t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
        return `t`

class CommitLockVoteTests(CommitLockTests):

    def checkCommitLockVoteFinish(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)

        self._begin_threads()

        self._storage.tpc_finish(txn)
        self._storage.load(oid, '')

        self._finish_threads()

        self._dostore()
        self._cleanup()

    def checkCommitLockVoteAbort(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)

        self._begin_threads()

        self._storage.tpc_abort(txn)

        self._finish_threads()

        self._dostore()
        self._cleanup()

    def checkCommitLockVoteClose(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)

        self._begin_threads()

        self._storage.close()

        self._finish_threads()
        self._cleanup()