/usr/lib/python3/dist-packages/pyutilib/pyro/util.py is in python3-pyutilib 5.3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | # _________________________________________________________________________
#
# PyUtilib: A Python utility library.
# Copyright (c) 2008 Sandia Corporation.
# This software is distributed under the BSD License.
# Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
# the U.S. Government retains certain rights in this software.
# _________________________________________________________________________
__all__ = ('get_nameserver', 'get_dispatchers', 'shutdown_pyro_components')
import os
import sys
import time
import random
import socket
from pyutilib.pyro import using_pyro3, using_pyro4
from pyutilib.pyro import Pyro as _pyro
if sys.version_info >= (3,0):
xrange = range
import queue as Queue
else:
import Queue
_connection_problem = None
if using_pyro3:
_connection_problem = _pyro.errors.ProtocolError
elif using_pyro4:
_connection_problem = _pyro.errors.TimeoutError
def get_nameserver(host=None,
port=None,
num_retries=30,
caller_name="Unknown"):
if _pyro is None:
raise ImportError("Pyro or Pyro4 is not available")
timeout_upper_bound = 5.0
if not host is None:
os.environ['PYRO_NS_HOSTNAME'] = host
elif 'PYRO_NS_HOSTNAME' in os.environ:
host = os.environ['PYRO_NS_HOSTNAME']
# Deprecated in Pyro3
# Removed in Pyro4
if using_pyro3:
_pyro.core.initServer()
ns = None
for i in xrange(0, num_retries+1):
try:
if using_pyro3:
ns = _pyro.naming.NameServerLocator().getNS(host=host, port=port)
else:
ns = _pyro.locateNS(host=host, port=port)
break
except _pyro.errors.NamingError:
pass
except _connection_problem:
# this can occur if the server is too busy.
pass
# we originally had a single sleep timeout value, hardcoded to 1 second.
# the problem with this approach is that if a large number of concurrent
# processes fail, then they will all re-attempt at roughly the same
# time. causing more contention than is necessary / desirable. by randomizing
# the sleep interval, we are hoping to distribute the number of clients
# attempting to connect to the name server at any given time.
# TBD: we should eventually read the timeout upper bound from an enviornment
# variable - to support cases with a very large (hundreds to thousands)
# number of clients.
if i < num_retries:
sleep_interval = random.uniform(1.0, timeout_upper_bound)
print("%s failed to locate name server after %d attempts - "
"trying again in %5.2f seconds."
% (caller_name, i+1, sleep_interval))
time.sleep(sleep_interval)
if ns is None:
print("%s could not locate nameserver (attempts=%d)"
% (caller_name,num_retries+1))
raise SystemExit
return ns
def get_dispatchers(group=":PyUtilibServer",
host=None,
port=None,
num_dispatcher_tries=30,
min_dispatchers=1,
caller_name=None,
ns=None):
if ns is None:
ns = get_nameserver(host=host, port=port, caller_name=caller_name)
else:
assert caller_name is None
assert host is None
assert port is None
if ns is None:
raise RuntimeError("Failed to locate Pyro name "
"server on the network!")
cumulative_sleep_time = 0.0
dispatchers = []
for i in xrange(0,num_dispatcher_tries):
ns_entries = None
if using_pyro3:
for (name,uri) in ns.flatlist():
if name.startswith(":PyUtilibServer.dispatcher."):
if (name,uri) not in dispatchers:
dispatchers.append((name, uri))
elif using_pyro4:
for name in ns.list(prefix=":PyUtilibServer.dispatcher."):
uri = ns.lookup(name)
if (name,uri) not in dispatchers:
dispatchers.append((name, uri))
if len(dispatchers) >= min_dispatchers:
break
return dispatchers
#
# a utility for shutting down Pyro-related components, which at the
# moment is restricted to the name server and any dispatchers. the
# mip servers will come down once their dispatcher is shut down.
# NOTE: this is a utility that should eventually become part of
# pyutilib.pyro, but because is prototype, I'm keeping it
# here for now.
#
def shutdown_pyro_components(host=None,
port=None,
num_retries=30,
caller_name="Unknown"):
if _pyro is None:
raise ImportError("Pyro or Pyro4 is not available")
ns = get_nameserver(host=host,
port=port,
num_retries=num_retries,
caller_name=caller_name)
if ns is None:
print("***WARNING - Could not locate name server "
"- Pyro components will not be shut down")
return
if using_pyro3:
ns_entries = ns.flatlist()
for (name,uri) in ns_entries:
if name.startswith(":PyUtilibServer.dispatcher."):
try:
ns.unregister(name)
proxy = _pyro.core.getProxyForURI(uri)
proxy.shutdown()
except:
pass
for (name,uri) in ns_entries:
if name == ":Pyro.NameServer":
try:
proxy = _pyro.core.getProxyForURI(uri)
proxy._shutdown()
proxy._release()
except:
pass
elif using_pyro4:
for name in ns.list(prefix=":PyUtilibServer.dispatcher."):
try:
uri = ns.lookup(name)
ns.remove(name)
proxy = _pyro.Proxy(uri)
proxy.shutdown()
proxy._pyroRelease()
except:
pass
print("")
print("*** NameServer must be shutdown manually when using Pyro4 ***")
print("")
def set_maxconnections(max_allowed_connections=None):
#
# **NOTE: For some reason with Pyro3 we need to add 1 to this
# option in order to to get behavior that makes sense
# and matches behavior with Pyro4. For instance,
# running a dispatcher with a single client and a single
# server requires PYRO_MAXCONNECTIONS=3 with Pyro3 and
# requires THREADPOOL_SIZE=2 with Pyro4.
#
if max_allowed_connections is None:
max_pyro_connections_envname = "PYUTILIB_PYRO_MAXCONNECTIONS"
if max_pyro_connections_envname in os.environ:
new_val = int(os.environ[max_pyro_connections_envname])
print("Overriding %s default for maximum number of proxy "
"connections to %s, based on specification provided by "
"%s environment variable."
% ("Pyro" if using_pyro3 else "Pyro4",
new_val,
max_pyro_connections_envname))
if using_pyro3:
_pyro.config.PYRO_MAXCONNECTIONS = new_val + 1
else:
_pyro.config.THREADPOOL_SIZE = new_val
else:
print("Overriding %s default for maximum number of proxy "
"connections to %s, based on specification provided by "
"max_allowed_connections keyword"
% ("Pyro" if using_pyro3 else "Pyro4", max_allowed_connections))
if using_pyro3:
_pyro.config.PYRO_MAXCONNECTIONS = max_allowed_connections + 1
else:
_pyro.config.THREADPOOL_SIZE = max_allowed_connections
def bind_port(sock, host="127.0.0.1"):
"""Bind the socket to a free port and return the port number.
Relies on ephemeral ports in order to ensure we are using an
unbound port. This is important as many tests may be running
simultaneously, especially in a buildbot environment. This method
raises an exception if the sock.family is AF_INET and sock.type is
SOCK_STREAM, *and* the socket has SO_REUSEADDR or SO_REUSEPORT set
on it. Tests should *never* set these socket options for TCP/IP
sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is
available (i.e. on Windows), it will be set on the socket. This
will prevent anyone else from bind()'ing to our host/port for the
duration of the test.
This code is copied from the stdlib's test.test_support module.
"""
if sock.family in (socket.AF_INET, socket.AF_INET6) and sock.type == socket.SOCK_STREAM:
if hasattr(socket, "SO_EXCLUSIVEADDRUSE"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
if sock.family == socket.AF_INET:
if host == 'localhost':
sock.bind(('127.0.0.1', 0))
else:
sock.bind((host, 0))
elif sock.family == socket.AF_INET6:
if host == 'localhost':
sock.bind(('::1', 0, 0, 0))
else:
sock.bind((host, 0, 0, 0))
else:
raise CommunicationError("unsupported socket family: " + sock.family)
return sock.getsockname()[1]
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR "
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT "
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern
# headers thus defining SO_REUSEPORT but this process
# is running under an older kernel that does not
# support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
"""
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding.
This is achieved by creating a temporary socket with the same
family and type as the 'sock' parameter (default is AF_INET,
SOCK_STREAM), and binding it to the specified host address
(defaults to 0.0.0.0) with the port set to 0, eliciting an unused
ephemeral port from the OS. The temporary socket is then closed
and deleted, and the ephemeral port is returned.
This code is copied from the stdlib's test.test_support module.
"""
tempsock = socket.socket(family, socktype)
port = bind_port(tempsock)
tempsock.close()
del tempsock
return port
|