This file is indexed.

/usr/lib/python2.7/dist-packages/async/thread.py is in python-async 0.6.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
# -*- coding: utf-8 -*-
"""Module with threading utilities"""

__docformat__ = "restructuredtext"
import threading
import inspect
import logging

try:
    import queue
except ImportError:
    import Queue as queue


__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread',
            'WorkerThread')

log = logging.getLogger()

#{ Decorators

def do_terminate_threads(whitelist=list()):
    """Simple function which terminates all of our threads
    :param whitelist: If whitelist is given, only the given threads will be terminated"""
    for t in threading.enumerate():
        if not isinstance(t, TerminatableThread):
            continue
        if whitelist and t not in whitelist:
            continue
        t.schedule_termination()
        t.stop_and_join()
    # END for each thread

def terminate_threads(func):
    """Kills all worker threads the method has created by sending the quit signal.
    This takes over in case of an error in the main function"""
    def wrapper(*args, **kwargs):
        cur_threads = set(threading.enumerate())
        try:
            return func(*args, **kwargs)
        finally:
            do_terminate_threads(set(threading.enumerate()) - cur_threads)
        # END finally shutdown threads
    # END wrapper
    wrapper.__name__ = func.__name__
    return wrapper

#} END decorators

#{ Classes

class TerminatableThread(threading.Thread):
    """A simple thread able to terminate itself on behalf of the user.

    Terminate a thread as follows:

    t.stop_and_join()

    Derived classes call _should_terminate() to determine whether they should
    abort gracefully
    """
    __slots__ = '_terminate'

    def __init__(self):
        super(TerminatableThread, self).__init__()
        self._terminate = False
        # Use standard python means of non-blocking threads (even though we can tell this one to stop explicitly)
        self.daemon = True


    #{ Subclass Interface
    def _should_terminate(self):
        """:return: True if this thread should terminate its operation immediately"""
        return self._terminate

    def _terminated(self):
        """Called once the thread terminated. Its called in the main thread
        and may perform cleanup operations"""
        pass

    def start(self):
        """Start the thread and return self"""
        super(TerminatableThread, self).start()
        return self

    #} END subclass interface

    #{ Interface
    def schedule_termination(self):
        """Schedule this thread to be terminated as soon as possible.
        :note: this method does not block."""
        self._terminate = True

    def stop_and_join(self):
        """Ask the thread to stop its operation and wait for it to terminate
        :note: Depending on the implenetation, this might block a moment"""
        self._terminate = True
        self.join()
        self._terminated()
    #} END interface


class StopProcessing(Exception):
    """If thrown in a function processed by a WorkerThread, it will terminate"""


class WorkerThread(TerminatableThread):
    """ This base allows to call functions on class instances natively.
    As it is meant to work with a pool, the result of the call must be
    handled by the callee.
    The thread runs forever unless it receives the terminate signal using
    its task queue.

    Tasks could be anything, but should usually be class methods and arguments to
    allow the following:

    inq = Queue()
    w = WorkerThread(inq)
    w.start()
    inq.put((WorkerThread.<method>, args, kwargs))

    finally we call quit to terminate asap.

    alternatively, you can make a call more intuitively - the output is the output queue
    allowing you to get the result right away or later
    w.call(arg, kwarg='value').get()

    inq.put(WorkerThread.quit)
    w.join()

    You may provide the following tuples as task:
    t[0] = class method, function or instance method
    t[1] = optional, tuple or list of arguments to pass to the routine
    t[2] = optional, dictionary of keyword arguments to pass to the routine
    """
    __slots__ = ('inq')


    # define how often we should check for a shutdown request in case our
    # taskqueue is empty
    shutdown_check_time_s = 0.5

    def __init__(self, inq = None):
        super(WorkerThread, self).__init__()
        self.inq = inq
        if inq is None:
            self.inq = queue.Queue()

    @classmethod
    def stop(cls, *args):
        """If send via the inq of the thread, it will stop once it processed the function"""
        raise StopProcessing

    def run(self):
        """Process input tasks until we receive the quit signal"""
        gettask = self.inq.get
        while True:
            if self._should_terminate():
                break
            # END check for stop request

            # note: during shutdown, this turns None in the middle of waiting
            # for an item to be put onto it - we can't du anything about it -
            # even if we catch everything and break gracefully, the parent
            # call will think we failed with an empty exception.
            # Hence we just don't do anything about it. Alternatively
            # we could override the start method to get our own bootstrapping,
            # which would mean repeating plenty of code in of the threading module.
            tasktuple = gettask()

            # needing exactly one function, and one arg
            routine, arg = tasktuple

            try:
                try:
                    if inspect.ismethod(routine):
                        if routine.__self__ is None:
                            routine(self, arg)
                        else:
                            routine(arg)
                    elif inspect.isroutine(routine):
                        routine(arg)
                    else:
                        # ignore unknown items
                        log.warn("%s: task %s was not understood - terminating", self.getName(), str(tasktuple))
                        break
                    # END make routine call
                finally:
                    # make sure we delete the routine to release the reference as soon
                    # as possible. Otherwise objects might not be destroyed
                    # while we are waiting
                    del(routine)
                    del(tasktuple)
            except StopProcessing:
                break
            except Exception as e:
                log.error("%s: Task raised unhandled exception: %s - this really shouldn't happen !",
                      (self.getName(), str(e)))
                continue    # just continue
            # END routine exception handling

            # END handle routine release
        # END endless loop

    def stop_and_join(self):
        """Send stop message to ourselves - we don't block, the thread will terminate
        once it has finished processing its input queue to receive our termination
        event"""
        # DONT call superclass as it will try to join - join's don't work for
        # some reason, as python apparently doesn't switch threads (so often)
        # while waiting ... I don't know, but the threads respond properly,
        # but only if dear python switches to them
        self.inq.put((self.stop, None))
#} END classes