/usr/lib/python3/dist-packages/twisted/test/test_tcp_internals.py is in python3-twisted-experimental 13.2.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | # Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Whitebox tests for TCP APIs.
"""
from __future__ import division, absolute_import
import errno, socket, os
try:
import resource
except ImportError:
resource = None
from twisted.python.compat import _PY3
from twisted.trial.unittest import TestCase
from twisted.python import log
from twisted.internet.tcp import ECONNABORTED, ENOMEM, ENFILE, EMFILE, ENOBUFS, EINPROGRESS, Port
from twisted.internet.protocol import ServerFactory
from twisted.python.runtime import platform
from twisted.internet.defer import maybeDeferred, gatherResults
from twisted.internet import reactor, interfaces
class PlatformAssumptionsTestCase(TestCase):
"""
Test assumptions about platform behaviors.
"""
if _PY3:
skip = "Port when Python 3 supports twisted.internet.process (#5987)"
socketLimit = 8192
def setUp(self):
self.openSockets = []
if resource is not None:
# On some buggy platforms we might leak FDs, and the test will
# fail creating the initial two sockets we *do* want to
# succeed. So, we make the soft limit the current number of fds
# plus two more (for the two sockets we want to succeed). If we've
# leaked too many fds for that to work, there's nothing we can
# do.
from twisted.internet.process import _listOpenFDs
newLimit = len(_listOpenFDs()) + 2
self.originalFileLimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (newLimit, self.originalFileLimit[1]))
self.socketLimit = newLimit + 100
def tearDown(self):
while self.openSockets:
self.openSockets.pop().close()
if resource is not None:
# OS X implicitly lowers the hard limit in the setrlimit call
# above. Retrieve the new hard limit to pass in to this
# setrlimit call, so that it doesn't give us a permission denied
# error.
currentHardLimit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
newSoftLimit = min(self.originalFileLimit[0], currentHardLimit)
resource.setrlimit(resource.RLIMIT_NOFILE, (newSoftLimit, currentHardLimit))
def socket(self):
"""
Create and return a new socket object, also tracking it so it can be
closed in the test tear down.
"""
s = socket.socket()
self.openSockets.append(s)
return s
def test_acceptOutOfFiles(self):
"""
Test that the platform accept(2) call fails with either L{EMFILE} or
L{ENOBUFS} when there are too many file descriptors open.
"""
# Make a server to which to connect
port = self.socket()
port.bind(('127.0.0.1', 0))
serverPortNumber = port.getsockname()[1]
port.listen(5)
# Make a client to use to connect to the server
client = self.socket()
client.setblocking(False)
# Use up all the rest of the file descriptors.
for i in xrange(self.socketLimit):
try:
self.socket()
except socket.error as e:
if e.args[0] in (EMFILE, ENOBUFS):
# The desired state has been achieved.
break
else:
# Some unexpected error occurred.
raise
else:
self.fail("Could provoke neither EMFILE nor ENOBUFS from platform.")
# Non-blocking connect is supposed to fail, but this is not true
# everywhere (e.g. freeBSD)
self.assertIn(client.connect_ex(('127.0.0.1', serverPortNumber)),
(0, EINPROGRESS))
# Make sure that the accept call fails in the way we expect.
exc = self.assertRaises(socket.error, port.accept)
self.assertIn(exc.args[0], (EMFILE, ENOBUFS))
if platform.getType() == "win32":
test_acceptOutOfFiles.skip = (
"Windows requires an unacceptably large amount of resources to "
"provoke this behavior in the naive manner.")
class SelectReactorTestCase(TestCase):
"""
Tests for select-specific failure conditions.
"""
def setUp(self):
self.ports = []
self.messages = []
log.addObserver(self.messages.append)
def tearDown(self):
log.removeObserver(self.messages.append)
return gatherResults([
maybeDeferred(p.stopListening)
for p in self.ports])
def port(self, portNumber, factory, interface):
"""
Create, start, and return a new L{Port}, also tracking it so it can
be stopped in the test tear down.
"""
p = Port(portNumber, factory, interface=interface)
p.startListening()
self.ports.append(p)
return p
def _acceptFailureTest(self, socketErrorNumber):
"""
Test behavior in the face of an exception from C{accept(2)}.
On any exception which indicates the platform is unable or unwilling
to allocate further resources to us, the existing port should remain
listening, a message should be logged, and the exception should not
propagate outward from doRead.
@param socketErrorNumber: The errno to simulate from accept.
"""
class FakeSocket(object):
"""
Pretend to be a socket in an overloaded system.
"""
def accept(self):
raise socket.error(
socketErrorNumber, os.strerror(socketErrorNumber))
factory = ServerFactory()
port = self.port(0, factory, interface='127.0.0.1')
originalSocket = port.socket
try:
port.socket = FakeSocket()
port.doRead()
expectedFormat = "Could not accept new connection (%s)"
expectedErrorCode = errno.errorcode[socketErrorNumber]
expectedMessage = expectedFormat % (expectedErrorCode,)
for msg in self.messages:
if msg.get('message') == (expectedMessage,):
break
else:
self.fail("Log event for failed accept not found in "
"%r" % (self.messages,))
finally:
port.socket = originalSocket
def test_tooManyFilesFromAccept(self):
"""
C{accept(2)} can fail with C{EMFILE} when there are too many open file
descriptors in the process. Test that this doesn't negatively impact
any other existing connections.
C{EMFILE} mainly occurs on Linux when the open file rlimit is
encountered.
"""
return self._acceptFailureTest(EMFILE)
def test_noBufferSpaceFromAccept(self):
"""
Similar to L{test_tooManyFilesFromAccept}, but test the case where
C{accept(2)} fails with C{ENOBUFS}.
This mainly occurs on Windows and FreeBSD, but may be possible on
Linux and other platforms as well.
"""
return self._acceptFailureTest(ENOBUFS)
def test_connectionAbortedFromAccept(self):
"""
Similar to L{test_tooManyFilesFromAccept}, but test the case where
C{accept(2)} fails with C{ECONNABORTED}.
It is not clear whether this is actually possible for TCP
connections on modern versions of Linux.
"""
return self._acceptFailureTest(ECONNABORTED)
def test_noFilesFromAccept(self):
"""
Similar to L{test_tooManyFilesFromAccept}, but test the case where
C{accept(2)} fails with C{ENFILE}.
This can occur on Linux when the system has exhausted (!) its supply
of inodes.
"""
return self._acceptFailureTest(ENFILE)
if platform.getType() == 'win32':
test_noFilesFromAccept.skip = "Windows accept(2) cannot generate ENFILE"
def test_noMemoryFromAccept(self):
"""
Similar to L{test_tooManyFilesFromAccept}, but test the case where
C{accept(2)} fails with C{ENOMEM}.
On Linux at least, this can sensibly occur, even in a Python program
(which eats memory like no ones business), when memory has become
fragmented or low memory has been filled (d_alloc calls
kmem_cache_alloc calls kmalloc - kmalloc only allocates out of low
memory).
"""
return self._acceptFailureTest(ENOMEM)
if platform.getType() == 'win32':
test_noMemoryFromAccept.skip = "Windows accept(2) cannot generate ENOMEM"
if not interfaces.IReactorFDSet.providedBy(reactor):
skipMsg = 'This test only applies to reactors that implement IReactorFDset'
PlatformAssumptionsTestCase.skip = skipMsg
SelectReactorTestCase.skip = skipMsg
|