This file is indexed.

/usr/lib/python2.7/dist-packages/kazoo/tests/test_gevent_handler.py is in python-kazoo 2.2.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import unittest

from nose import SkipTest
from nose.tools import eq_
from nose.tools import raises

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import Callback
from kazoo.testing import KazooTestCase
from kazoo.tests import test_client


class TestGeventHandler(unittest.TestCase):

    def setUp(self):
        try:
            import gevent  # NOQA
        except ImportError:
            raise SkipTest('gevent not available.')

    def _makeOne(self, *args):
        from kazoo.handlers.gevent import SequentialGeventHandler
        return SequentialGeventHandler(*args)

    def _getAsync(self, *args):
        from kazoo.handlers.gevent import AsyncResult
        return AsyncResult

    def _getEvent(self):
        from gevent.event import Event
        return Event

    def test_proper_threading(self):
        h = self._makeOne()
        h.start()
        assert isinstance(h.event_object(), self._getEvent())

    def test_matching_async(self):
        h = self._makeOne()
        h.start()
        async = self._getAsync()
        assert isinstance(h.async_result(), async)

    def test_exception_raising(self):
        h = self._makeOne()

        @raises(h.timeout_exception)
        def testit():
            raise h.timeout_exception("This is a timeout")
        testit()

    def test_exception_in_queue(self):
        h = self._makeOne()
        h.start()
        ev = self._getEvent()()

        def func():
            ev.set()
            raise ValueError('bang')

        call1 = Callback('completion', func, ())
        h.dispatch_callback(call1)
        ev.wait()

    def test_queue_empty_exception(self):
        from gevent.queue import Empty
        h = self._makeOne()
        h.start()
        ev = self._getEvent()()

        def func():
            ev.set()
            raise Empty()

        call1 = Callback('completion', func, ())
        h.dispatch_callback(call1)
        ev.wait()


class TestBasicGeventClient(KazooTestCase):

    def setUp(self):
        try:
            import gevent  # NOQA
        except ImportError:
            raise SkipTest('gevent not available.')
        KazooTestCase.setUp(self)

    def _makeOne(self, *args):
        from kazoo.handlers.gevent import SequentialGeventHandler
        return SequentialGeventHandler(*args)

    def _getEvent(self):
        from gevent.event import Event
        return Event

    def test_start(self):
        client = self._get_client(handler=self._makeOne())
        client.start()
        self.assertEqual(client.state, 'CONNECTED')
        client.stop()

    def test_start_stop_double(self):
        client = self._get_client(handler=self._makeOne())
        client.start()
        self.assertEqual(client.state, 'CONNECTED')
        client.handler.start()
        client.handler.stop()
        client.stop()

    def test_basic_commands(self):
        client = self._get_client(handler=self._makeOne())
        client.start()
        self.assertEqual(client.state, 'CONNECTED')
        client.create('/anode', 'fred')
        eq_(client.get('/anode')[0], 'fred')
        eq_(client.delete('/anode'), True)
        eq_(client.exists('/anode'), None)
        client.stop()

    def test_failures(self):
        client = self._get_client(handler=self._makeOne())
        client.start()
        self.assertRaises(NoNodeError, client.get, '/none')
        client.stop()

    def test_data_watcher(self):
        client = self._get_client(handler=self._makeOne())
        client.start()
        client.ensure_path('/some/node')
        ev = self._getEvent()()

        @client.DataWatch('/some/node')
        def changed(d, stat):
            ev.set()

        ev.wait()
        ev.clear()
        client.set('/some/node', 'newvalue')
        ev.wait()
        client.stop()


class TestGeventClient(test_client.TestClient):

    def setUp(self):
        try:
            import gevent  # NOQA
        except ImportError:
            raise SkipTest('gevent not available.')
        KazooTestCase.setUp(self)

    def _makeOne(self, *args):
        from kazoo.handlers.gevent import SequentialGeventHandler
        return SequentialGeventHandler(*args)

    def _get_client(self, **kwargs):
        kwargs["handler"] = self._makeOne()
        return KazooClient(self.hosts, **kwargs)