This file is indexed.

/usr/share/pyshared/execnet/threadpool.py is in python-execnet 1.0.9-0.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
dispatching execution to threads

(c) 2009, holger krekel
"""
import threading
import time
import sys

# py2/py3 compatibility
try:
    import queue
except ImportError:
    import Queue as queue
if sys.version_info >= (3,0):
    exec ("def reraise(cls, val, tb): raise val")
else:
    exec ("def reraise(cls, val, tb): raise cls, val, tb")

ERRORMARKER = object()

class Reply(object):
    """ reply instances provide access to the result
        of a function execution that got dispatched
        through WorkerPool.dispatch()
    """
    _excinfo = None
    def __init__(self, task):
        self.task = task
        self._queue = queue.Queue()

    def _set(self, result):
        self._queue.put(result)

    def _setexcinfo(self, excinfo):
        self._excinfo = excinfo
        self._queue.put(ERRORMARKER)

    def get(self, timeout=None):
        """ get the result object from an asynchronous function execution.
            if the function execution raised an exception,
            then calling get() will reraise that exception
            including its traceback.
        """
        if self._queue is None:
            raise EOFError("reply has already been delivered")
        try:
            result = self._queue.get(timeout=timeout)
        except queue.Empty:
            raise IOError("timeout waiting for %r" %(self.task, ))
        if result is ERRORMARKER:
            self._queue = None
            excinfo = self._excinfo
            reraise(excinfo[0], excinfo[1], excinfo[2])
        return result

class WorkerThread(threading.Thread):
    def __init__(self, pool):
        threading.Thread.__init__(self)
        self._queue = queue.Queue()
        self._pool = pool
        self.setDaemon(1)

    def _run_once(self):
        reply = self._queue.get()
        if reply is SystemExit:
            return False
        assert self not in self._pool._ready
        task = reply.task
        try:
            func, args, kwargs = task
            result = func(*args, **kwargs)
        except (SystemExit, KeyboardInterrupt):
            return False
        except:
            reply._setexcinfo(sys.exc_info())
        else:
            reply._set(result)
        # at this point, reply, task and all other local variables go away
        return True

    def run(self):
        try:
            while self._run_once():
                self._pool._ready[self] = True
        finally:
            del self._pool._alive[self]
            try:
                del self._pool._ready[self]
            except KeyError:
                pass

    def send(self, task):
        reply = Reply(task)
        self._queue.put(reply)
        return reply

    def stop(self):
        self._queue.put(SystemExit)

class WorkerPool(object):
    """ A WorkerPool allows to dispatch function executions
        to threads.  Each Worker Thread is reused for multiple
        function executions. The dispatching operation
        takes care to create and dispatch to existing
        threads.

        You need to call shutdown() to signal
        the WorkerThreads to terminate and join()
        in order to wait until all worker threads
        have terminated.
    """
    _shuttingdown = False
    def __init__(self, maxthreads=None):
        """ init WorkerPool instance which may
            create up to `maxthreads` worker threads.
        """
        self.maxthreads = maxthreads
        self._ready = {}
        self._alive = {}

    def dispatch(self, func, *args, **kwargs):
        """ return Reply object for the asynchronous dispatch
            of the given func(*args, **kwargs) in a
            separate worker thread.
        """
        if self._shuttingdown:
            raise IOError("WorkerPool is already shutting down")
        try:
            thread, _ = self._ready.popitem()
        except KeyError: # pop from empty list
            if self.maxthreads and len(self._alive) >= self.maxthreads:
                raise IOError("can't create more than %d threads." %
                              (self.maxthreads,))
            thread = self._newthread()
        return thread.send((func, args, kwargs))

    def _newthread(self):
        thread = WorkerThread(self)
        self._alive[thread] = True
        thread.start()
        return thread

    def shutdown(self):
        """ signal all worker threads to terminate.
            call join() to wait until all threads termination.
        """
        if not self._shuttingdown:
            self._shuttingdown = True
            for t in list(self._alive):
                t.stop()

    def join(self, timeout=None):
        """ wait until all worker threads have terminated. """
        current = threading.currentThread()
        deadline = delta = None
        if timeout is not None:
            deadline = time.time() + timeout
        for thread in list(self._alive):
            if deadline:
                delta = deadline - time.time()
                if delta <= 0:
                    raise IOError("timeout while joining threads")
            thread.join(timeout=delta)
            if thread.isAlive():
                raise IOError("timeout while joining threads")

if __name__ == '__channelexec__':
    maxthreads = channel.receive()
    execpool = WorkerPool(maxthreads=maxthreads)
    gw = channel.gateway
    channel.send("ok")
    gw._trace("instantiated thread work pool maxthreads=%s" %(maxthreads,))
    while 1:
        gw._trace("waiting for new exec task")
        task = gw._execqueue.get()
        if task is None:
            gw._trace("thread-dispatcher got None, exiting")
            execpool.shutdown()
            execpool.join()
            raise gw._StopExecLoop
        gw._trace("dispatching exec task to thread pool")
        execpool.dispatch(gw.executetask, task)