/usr/lib/python3.4/multiprocessing/util.py is in libpython3.4-stdlib 3.4.3-1ubuntu1~14.04.7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | #
# Module providing various facilities to other parts of the package
#
# multiprocessing/util.py
#
# Copyright (c) 2006-2008, R Oudkerk
# Licensed to PSF under a Contributor Agreement.
#
import os
import itertools
import weakref
import atexit
import threading # we want threading to install it's
# cleanup function before multiprocessing does
from subprocess import _args_from_interpreter_flags
from . import process
__all__ = [
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
'log_to_stderr', 'get_temp_dir', 'register_after_fork',
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
]
#
# Logging
#
NOTSET = 0
SUBDEBUG = 5
DEBUG = 10
INFO = 20
SUBWARNING = 25
LOGGER_NAME = 'multiprocessing'
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
_logger = None
_log_to_stderr = False
def sub_debug(msg, *args):
if _logger:
_logger.log(SUBDEBUG, msg, *args)
def debug(msg, *args):
if _logger:
_logger.log(DEBUG, msg, *args)
def info(msg, *args):
if _logger:
_logger.log(INFO, msg, *args)
def sub_warning(msg, *args):
if _logger:
_logger.log(SUBWARNING, msg, *args)
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger
def log_to_stderr(level=None):
'''
Turn on logging and add a handler which prints to stderr
'''
global _log_to_stderr
import logging
logger = get_logger()
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
if level:
logger.setLevel(level)
_log_to_stderr = True
return _logger
#
# Function returning a temp directory which will be removed on exit
#
def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up
tempdir = process.current_process()._config.get('tempdir')
if tempdir is None:
import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir)
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
process.current_process()._config['tempdir'] = tempdir
return tempdir
#
# Support for reinitialization of objects when bootstrapping a child process
#
_afterfork_registry = weakref.WeakValueDictionary()
_afterfork_counter = itertools.count()
def _run_after_forkers():
items = list(_afterfork_registry.items())
items.sort()
for (index, ident, func), obj in items:
try:
func(obj)
except Exception as e:
info('after forker raised exception %s', e)
def register_after_fork(obj, func):
_afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
#
# Finalization using weakrefs
#
_finalizer_registry = {}
_finalizer_counter = itertools.count()
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
assert exitpriority is None or type(exitpriority) is int
if obj is not None:
self._weakref = weakref.ref(obj, self)
else:
assert exitpriority is not None
self._callback = callback
self._args = args
self._kwargs = kwargs or {}
self._key = (exitpriority, next(_finalizer_counter))
self._pid = os.getpid()
_finalizer_registry[self._key] = self
def __call__(self, wr=None,
# Need to bind these locally because the globals can have
# been cleared at shutdown
_finalizer_registry=_finalizer_registry,
sub_debug=sub_debug, getpid=os.getpid):
'''
Run the callback unless it has already been called or cancelled
'''
try:
del _finalizer_registry[self._key]
except KeyError:
sub_debug('finalizer no longer registered')
else:
if self._pid != getpid():
sub_debug('finalizer ignored because different process')
res = None
else:
sub_debug('finalizer calling %s with args %s and kwargs %s',
self._callback, self._args, self._kwargs)
res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
return res
def cancel(self):
'''
Cancel finalization of the object
'''
try:
del _finalizer_registry[self._key]
except KeyError:
pass
else:
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
def still_active(self):
'''
Return whether this finalizer is still waiting to invoke callback
'''
return self._key in _finalizer_registry
def __repr__(self):
try:
obj = self._weakref()
except (AttributeError, TypeError):
obj = None
if obj is None:
return '<Finalize object, dead>'
x = '<Finalize object, callback=%s' % \
getattr(self._callback, '__name__', self._callback)
if self._args:
x += ', args=' + str(self._args)
if self._kwargs:
x += ', kwargs=' + str(self._kwargs)
if self._key[0] is not None:
x += ', exitprority=' + str(self._key[0])
return x + '>'
def _run_finalizers(minpriority=None):
'''
Run all finalizers whose exit priority is not None and at least minpriority
Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
if _finalizer_registry is None:
# This function may be called after this module's globals are
# destroyed. See the _exit_function function in this module for more
# notes.
return
if minpriority is None:
f = lambda p : p[0][0] is not None
else:
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
items = [x for x in list(_finalizer_registry.items()) if f(x)]
items.sort(reverse=True)
for key, finalizer in items:
sub_debug('calling %s', finalizer)
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
if minpriority is None:
_finalizer_registry.clear()
#
# Clean up on exit
#
def is_exiting():
'''
Returns true if the process is shutting down
'''
return _exiting or _exiting is None
_exiting = False
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
active_children=process.active_children,
current_process=process.current_process):
# We hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module's globals are destroyed.
global _exiting
if not _exiting:
_exiting = True
info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
if current_process() is not None:
# We check if the current process is None here because if
# it's None, any call to ``active_children()`` will raise
# an AttributeError (active_children winds up trying to
# get attributes from util._current_process). One
# situation where this can happen is if someone has
# manipulated sys.modules, causing this module to be
# garbage collected. The destructor for the module type
# then replaces all values in the module dict with None.
# For instance, after setuptools runs a test it replaces
# sys.modules with a copy created earlier. See issues
# #9775 and #15881. Also related: #4106, #9205, and
# #9207.
for p in active_children():
if p.daemon:
info('calling terminate() for daemon %s', p.name)
p._popen.terminate()
for p in active_children():
info('calling join() for process %s', p.name)
p.join()
debug('running the remaining "atexit" finalizers')
_run_finalizers()
atexit.register(_exit_function)
#
# Some fork aware types
#
class ForkAwareThreadLock(object):
def __init__(self):
self._reset()
register_after_fork(self, ForkAwareThreadLock._reset)
def _reset(self):
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
class ForkAwareLocal(threading.local):
def __init__(self):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
#
# Close fds except those specified
#
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except Exception:
MAXFD = 256
def close_all_fds_except(fds):
fds = list(fds) + [-1, MAXFD]
fds.sort()
assert fds[-1] == MAXFD, 'fd too large'
for i in range(len(fds) - 1):
os.closerange(fds[i]+1, fds[i+1])
#
# Start a program with only specified fds kept open
#
def spawnv_passfds(path, args, passfds):
import _posixsubprocess
passfds = sorted(passfds)
errpipe_read, errpipe_write = os.pipe()
try:
return _posixsubprocess.fork_exec(
args, [os.fsencode(path)], True, passfds, None, None,
-1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
False, False, None)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
|