/usr/lib/python/astrometry/util/multiproc.py is in astrometry.net 0.46-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | import multiprocessing
class FakeAsyncResult(object):
def __init__(self, X):
self.X = X
def wait(self, *a):
pass
def get(self, *a):
return self.X
def ready(self):
return True
def successful(self):
return True
#class PostprocessedAsyncResult(object):
# pass
class funcwrapper(object):
def __init__(self, func):
self.func = func
def __call__(self, *X):
#print 'Trying to call', self.func
#print 'with args', X
try:
return self.func(*X)
except:
import traceback
print 'Exception while calling your function:'
print ' params:', X
print ' exception:'
traceback.print_exc()
raise
class memberfuncwrapper(object):
def __init__(self, obj, funcname):
self.obj = obj
self.funcname = funcname
def __call__(self, *X):
func = self.obj.getattr(self.funcname)
#print 'Trying to call', self.func
#print 'with args', X
try:
return func(self.obj, *X)
except:
import traceback
print 'Exception while calling your function:'
print ' object:', self.obj
print ' member function:', self.funcname
print ' ', func
print ' params:', X
print ' exception:'
traceback.print_exc()
raise
class multiproc(object):
def __init__(self, nthreads=1, init=None, initargs=None,
map_chunksize=1, pool=None, wrap_all=False):
self.wrap_all = wrap_all
if pool is not None:
self.pool = pool
self.applyfunc = self.pool.apply_async
else:
if nthreads == 1:
self.pool = None
# self.map = map
self.applyfunc = apply
if init is not None:
init(*initargs)
else:
self.pool = multiprocessing.Pool(nthreads, init, initargs)
# self.map = self.pool.map
self.applyfunc = self.pool.apply_async
self.async_results = []
self.map_chunksize = map_chunksize
def map(self, f, args, chunksize=None, wrap=False):
cs = chunksize
if cs is None:
cs = self.map_chunksize
if self.pool:
if wrap or self.wrap_all:
f = funcwrapper(f)
#print 'pool.map: f', f
#print 'args', args
#print 'cs', cs
return self.pool.map(f, args, cs)
return map(f, args)
def map_async(self, func, iterable, wrap=False):
if self.pool is None:
return FakeAsyncResult(map(func, iterable))
if wrap or self.wrap_all:
return self.pool.map_async(funcwrapper(func), iterable)
return self.pool.map_async(func, iterable)
def apply(self, f, args, wrap=False, kwargs={}):
if self.pool is None:
return FakeAsyncResult(f(*args, **kwargs))
if wrap:
f = funcwrapper(f)
res = self.applyfunc(f, args, kwargs)
self.async_results.append(res)
return res
def waitforall(self):
print 'Waiting for async results to finish...'
for r in self.async_results:
print ' waiting for', r
r.wait()
print 'all done'
self.async_results = []
def close(self):
if self.pool is not None:
self.pool.close()
self.pool = None
|