This file is indexed.

/usr/lib/python2.7/dist-packages/kazoo/tests/test_queue.py is in python-kazoo 2.2.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import uuid

from nose import SkipTest
from nose.tools import eq_, ok_

from kazoo.testing import KazooTestCase
from kazoo.tests.util import TRAVIS_ZK_VERSION


class KazooQueueTests(KazooTestCase):

    def _makeOne(self):
        path = "/" + uuid.uuid4().hex
        return self.client.Queue(path)

    def test_queue_validation(self):
        queue = self._makeOne()
        self.assertRaises(TypeError, queue.put, {})
        self.assertRaises(TypeError, queue.put, b"one", b"100")
        self.assertRaises(TypeError, queue.put, b"one", 10.0)
        self.assertRaises(ValueError, queue.put, b"one", -100)
        self.assertRaises(ValueError, queue.put, b"one", 100000)

    def test_empty_queue(self):
        queue = self._makeOne()
        eq_(len(queue), 0)
        self.assertTrue(queue.get() is None)
        eq_(len(queue), 0)

    def test_queue(self):
        queue = self._makeOne()
        queue.put(b"one")
        queue.put(b"two")
        queue.put(b"three")
        eq_(len(queue), 3)

        eq_(queue.get(), b"one")
        eq_(queue.get(), b"two")
        eq_(queue.get(), b"three")
        eq_(len(queue), 0)

    def test_priority(self):
        queue = self._makeOne()
        queue.put(b"four", priority=101)
        queue.put(b"one", priority=0)
        queue.put(b"two", priority=0)
        queue.put(b"three", priority=10)

        eq_(queue.get(), b"one")
        eq_(queue.get(), b"two")
        eq_(queue.get(), b"three")
        eq_(queue.get(), b"four")


class KazooLockingQueueTests(KazooTestCase):

    def setUp(self):
        KazooTestCase.setUp(self)
        skip = False
        if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4):
            skip = True
        elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4):
            skip = False
        else:
            ver = self.client.server_version()
            if ver[1] < 4:
                skip = True
        if skip:
            raise SkipTest("Must use Zookeeper 3.4 or above")

    def _makeOne(self):
        path = "/" + uuid.uuid4().hex
        return self.client.LockingQueue(path)

    def test_queue_validation(self):
        queue = self._makeOne()
        self.assertRaises(TypeError, queue.put, {})
        self.assertRaises(TypeError, queue.put, b"one", b"100")
        self.assertRaises(TypeError, queue.put, b"one", 10.0)
        self.assertRaises(ValueError, queue.put, b"one", -100)
        self.assertRaises(ValueError, queue.put, b"one", 100000)
        self.assertRaises(TypeError, queue.put_all, {})
        self.assertRaises(TypeError, queue.put_all, [{}])
        self.assertRaises(TypeError, queue.put_all, [b"one"], b"100")
        self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0)
        self.assertRaises(ValueError, queue.put_all, [b"one"], -100)
        self.assertRaises(ValueError, queue.put_all, [b"one"], 100000)

    def test_empty_queue(self):
        queue = self._makeOne()
        eq_(len(queue), 0)
        self.assertTrue(queue.get(0) is None)
        eq_(len(queue), 0)

    def test_queue(self):
        queue = self._makeOne()
        queue.put(b"one")
        queue.put_all([b"two", b"three"])
        eq_(len(queue), 3)

        ok_(not queue.consume())
        ok_(not queue.holds_lock())
        eq_(queue.get(1), b"one")
        ok_(queue.holds_lock())
        # Without consuming, should return the same element
        eq_(queue.get(1), b"one")
        ok_(queue.consume())
        ok_(not queue.holds_lock())
        eq_(queue.get(1), b"two")
        ok_(queue.holds_lock())
        ok_(queue.consume())
        ok_(not queue.holds_lock())
        eq_(queue.get(1), b"three")
        ok_(queue.holds_lock())
        ok_(queue.consume())
        ok_(not queue.holds_lock())
        ok_(not queue.consume())
        eq_(len(queue), 0)

    def test_consume(self):
        queue = self._makeOne()

        queue.put(b"one")
        ok_(not queue.consume())
        queue.get(.1)
        ok_(queue.consume())
        ok_(not queue.consume())

    def test_holds_lock(self):
        queue = self._makeOne()

        ok_(not queue.holds_lock())
        queue.put(b"one")
        queue.get(.1)
        ok_(queue.holds_lock())
        queue.consume()
        ok_(not queue.holds_lock())

    def test_priority(self):
        queue = self._makeOne()
        queue.put(b"four", priority=101)
        queue.put(b"one", priority=0)
        queue.put(b"two", priority=0)
        queue.put(b"three", priority=10)

        eq_(queue.get(1), b"one")
        ok_(queue.consume())
        eq_(queue.get(1), b"two")
        ok_(queue.consume())
        eq_(queue.get(1), b"three")
        ok_(queue.consume())
        eq_(queue.get(1), b"four")
        ok_(queue.consume())

    def test_concurrent_execution(self):
        queue = self._makeOne()
        value1 = []
        value2 = []
        value3 = []
        event1 = self.client.handler.event_object()
        event2 = self.client.handler.event_object()
        event3 = self.client.handler.event_object()

        def get_concurrently(value, event):
            q = self.client.LockingQueue(queue.path)
            value.append(q.get(.1))
            event.set()

        self.client.handler.spawn(get_concurrently, value1, event1)
        self.client.handler.spawn(get_concurrently, value2, event2)
        self.client.handler.spawn(get_concurrently, value3, event3)
        queue.put(b"one")
        event1.wait(.2)
        event2.wait(.2)
        event3.wait(.2)

        result = value1 + value2 + value3
        eq_(result.count(b"one"), 1)
        eq_(result.count(None), 2)