/usr/lib/python3/dist-packages/twisted/internet/test/test_posixbase.py is in python3-twisted-experimental 13.2.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 | # Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.posixbase} and supporting code.
"""
from __future__ import division, absolute_import
from twisted.python.compat import _PY3
from twisted.trial.unittest import TestCase
from twisted.internet.defer import Deferred
from twisted.internet.posixbase import PosixReactorBase, _Waker
from twisted.internet.protocol import ServerFactory
skipSockets = None
if _PY3:
skipSockets = "Re-enable when Python 3 port supports AF_UNIX"
else:
try:
from twisted.internet import unix
from twisted.test.test_unix import ClientProto
except ImportError:
skipSockets = "Platform does not support AF_UNIX sockets"
from twisted.internet.tcp import Port
from twisted.internet import reactor
class TrivialReactor(PosixReactorBase):
def __init__(self):
self._readers = {}
self._writers = {}
PosixReactorBase.__init__(self)
def addReader(self, reader):
self._readers[reader] = True
def removeReader(self, reader):
del self._readers[reader]
def addWriter(self, writer):
self._writers[writer] = True
def removeWriter(self, writer):
del self._writers[writer]
class PosixReactorBaseTests(TestCase):
"""
Tests for L{PosixReactorBase}.
"""
def _checkWaker(self, reactor):
self.assertIsInstance(reactor.waker, _Waker)
self.assertIn(reactor.waker, reactor._internalReaders)
self.assertIn(reactor.waker, reactor._readers)
def test_wakerIsInternalReader(self):
"""
When L{PosixReactorBase} is instantiated, it creates a waker and adds
it to its internal readers set.
"""
reactor = TrivialReactor()
self._checkWaker(reactor)
def test_removeAllSkipsInternalReaders(self):
"""
Any L{IReadDescriptors} in L{PosixReactorBase._internalReaders} are
left alone by L{PosixReactorBase._removeAll}.
"""
reactor = TrivialReactor()
extra = object()
reactor._internalReaders.add(extra)
reactor.addReader(extra)
reactor._removeAll(reactor._readers, reactor._writers)
self._checkWaker(reactor)
self.assertIn(extra, reactor._internalReaders)
self.assertIn(extra, reactor._readers)
def test_removeAllReturnsRemovedDescriptors(self):
"""
L{PosixReactorBase._removeAll} returns a list of removed
L{IReadDescriptor} and L{IWriteDescriptor} objects.
"""
reactor = TrivialReactor()
reader = object()
writer = object()
reactor.addReader(reader)
reactor.addWriter(writer)
removed = reactor._removeAll(
reactor._readers, reactor._writers)
self.assertEqual(set(removed), set([reader, writer]))
self.assertNotIn(reader, reactor._readers)
self.assertNotIn(writer, reactor._writers)
class TCPPortTests(TestCase):
"""
Tests for L{twisted.internet.tcp.Port}.
"""
if not isinstance(reactor, PosixReactorBase):
skip = "Non-posixbase reactor"
def test_connectionLostFailed(self):
"""
L{Port.stopListening} returns a L{Deferred} which errbacks if
L{Port.connectionLost} raises an exception.
"""
port = Port(12345, ServerFactory())
port.connected = True
port.connectionLost = lambda reason: 1 // 0
return self.assertFailure(port.stopListening(), ZeroDivisionError)
class TimeoutReportReactor(PosixReactorBase):
"""
A reactor which is just barely runnable and which cannot monitor any
readers or writers, and which fires a L{Deferred} with the timeout
passed to its C{doIteration} method as soon as that method is invoked.
"""
def __init__(self):
PosixReactorBase.__init__(self)
self.iterationTimeout = Deferred()
self.now = 100
def addReader(self, reader):
"""
Ignore the reader. This is necessary because the waker will be
added. However, we won't actually monitor it for any events.
"""
def removeAll(self):
"""
There are no readers or writers, so there is nothing to remove.
This will be called when the reactor stops, though, so it must be
implemented.
"""
return []
def seconds(self):
"""
Override the real clock with a deterministic one that can be easily
controlled in a unit test.
"""
return self.now
def doIteration(self, timeout):
d = self.iterationTimeout
if d is not None:
self.iterationTimeout = None
d.callback(timeout)
class IterationTimeoutTests(TestCase):
"""
Tests for the timeout argument L{PosixReactorBase.run} calls
L{PosixReactorBase.doIteration} with in the presence of various delayed
calls.
"""
def _checkIterationTimeout(self, reactor):
timeout = []
reactor.iterationTimeout.addCallback(timeout.append)
reactor.iterationTimeout.addCallback(lambda ignored: reactor.stop())
reactor.run()
return timeout[0]
def test_noCalls(self):
"""
If there are no delayed calls, C{doIteration} is called with a
timeout of C{None}.
"""
reactor = TimeoutReportReactor()
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, None)
def test_delayedCall(self):
"""
If there is a delayed call, C{doIteration} is called with a timeout
which is the difference between the current time and the time at
which that call is to run.
"""
reactor = TimeoutReportReactor()
reactor.callLater(100, lambda: None)
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, 100)
def test_timePasses(self):
"""
If a delayed call is scheduled and then some time passes, the
timeout passed to C{doIteration} is reduced by the amount of time
which passed.
"""
reactor = TimeoutReportReactor()
reactor.callLater(100, lambda: None)
reactor.now += 25
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, 75)
def test_multipleDelayedCalls(self):
"""
If there are several delayed calls, C{doIteration} is called with a
timeout which is the difference between the current time and the
time at which the earlier of the two calls is to run.
"""
reactor = TimeoutReportReactor()
reactor.callLater(50, lambda: None)
reactor.callLater(10, lambda: None)
reactor.callLater(100, lambda: None)
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, 10)
def test_resetDelayedCall(self):
"""
If a delayed call is reset, the timeout passed to C{doIteration} is
based on the interval between the time when reset is called and the
new delay of the call.
"""
reactor = TimeoutReportReactor()
call = reactor.callLater(50, lambda: None)
reactor.now += 25
call.reset(15)
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, 15)
def test_delayDelayedCall(self):
"""
If a delayed call is re-delayed, the timeout passed to
C{doIteration} is based on the remaining time before the call would
have been made and the additional amount of time passed to the delay
method.
"""
reactor = TimeoutReportReactor()
call = reactor.callLater(50, lambda: None)
reactor.now += 10
call.delay(20)
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, 60)
def test_cancelDelayedCall(self):
"""
If the only delayed call is canceled, C{None} is the timeout passed
to C{doIteration}.
"""
reactor = TimeoutReportReactor()
call = reactor.callLater(50, lambda: None)
call.cancel()
timeout = self._checkIterationTimeout(reactor)
self.assertEqual(timeout, None)
class ConnectedDatagramPortTestCase(TestCase):
"""
Test connected datagram UNIX sockets.
"""
if skipSockets is not None:
skip = skipSockets
def test_connectionFailedDoesntCallLoseConnection(self):
"""
L{ConnectedDatagramPort} does not call the deprecated C{loseConnection}
in L{ConnectedDatagramPort.connectionFailed}.
"""
def loseConnection():
"""
Dummy C{loseConnection} method. C{loseConnection} is deprecated and
should not get called.
"""
self.fail("loseConnection is deprecated and should not get called.")
port = unix.ConnectedDatagramPort(None, ClientProto())
port.loseConnection = loseConnection
port.connectionFailed("goodbye")
def test_connectionFailedCallsStopListening(self):
"""
L{ConnectedDatagramPort} calls L{ConnectedDatagramPort.stopListening}
instead of the deprecated C{loseConnection} in
L{ConnectedDatagramPort.connectionFailed}.
"""
self.called = False
def stopListening():
"""
Dummy C{stopListening} method.
"""
self.called = True
port = unix.ConnectedDatagramPort(None, ClientProto())
port.stopListening = stopListening
port.connectionFailed("goodbye")
self.assertEqual(self.called, True)
|