/usr/lib/python3/dist-packages/zmq/tests/test_future.py is in python3-zmq 15.2.0-0ubuntu4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | # Copyright (c) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import zmq
try:
from tornado import gen
except ImportError:
from unittest import SkipTest
class gen(object):
@classmethod
def coroutine(cls, function):
raise SkipTest('tornado unavailable')
from zmq.eventloop import future
from zmq.eventloop.ioloop import IOLoop
from zmq.tests import BaseZMQTestCase
class TestFutureSocket(BaseZMQTestCase):
Context = future.Context
def setUp(self):
self.loop = IOLoop()
self.loop.make_current()
super(TestFutureSocket, self).setUp()
def tearDown(self):
super(TestFutureSocket, self).tearDown()
self.loop.close(all_fds=True)
def test_socket_class(self):
s = self.context.socket(zmq.PUSH)
assert isinstance(s, future.Socket)
s.close()
def test_recv_multipart(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.recv_multipart()
assert not f.done()
yield a.send(b'hi')
recvd = yield f
self.assertEqual(recvd, [b'hi'])
self.loop.run_sync(test)
def test_recv(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f1 = b.recv()
f2 = b.recv()
assert not f1.done()
assert not f2.done()
yield a.send_multipart([b'hi', b'there'])
recvd = yield f2
assert f1.done()
self.assertEqual(f1.result(), b'hi')
self.assertEqual(recvd, b'there')
self.loop.run_sync(test)
def test_recv_cancel(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f1 = b.recv()
f2 = b.recv_multipart()
assert f1.cancel()
assert f1.done()
assert not f2.done()
yield a.send_multipart([b'hi', b'there'])
recvd = yield f2
assert f1.cancelled()
assert f2.done()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_sync(test)
def test_poll(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield a.send_multipart([b'hi', b'there'])
evt = yield f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_sync(test)
|