This file is indexed.

/usr/lib/python2.7/dist-packages/apscheduler/threadpool.py is in python-apscheduler 2.1.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Generic thread pool class. Modeled after Java's ThreadPoolExecutor.
Please note that this ThreadPool does *not* fully implement the PEP 3148
ThreadPool!
"""

from threading import Thread, Lock, currentThread
from weakref import ref
import logging
import atexit

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty

logger = logging.getLogger(__name__)
_threadpools = set()


# Worker threads are daemonic in order to let the interpreter exit without
# an explicit shutdown of the thread pool. The following trick is necessary
# to allow worker threads to finish cleanly.
def _shutdown_all():
    for pool_ref in tuple(_threadpools):
        pool = pool_ref()
        if pool:
            pool.shutdown()

atexit.register(_shutdown_all)


class ThreadPool(object):
    def __init__(self, core_threads=0, max_threads=20, keepalive=1):
        """
        :param core_threads: maximum number of persistent threads in the pool
        :param max_threads: maximum number of total threads in the pool
        :param thread_class: callable that creates a Thread object
        :param keepalive: seconds to keep non-core worker threads waiting
            for new tasks
        """
        self.core_threads = core_threads
        self.max_threads = max(max_threads, core_threads, 1)
        self.keepalive = keepalive
        self._queue = Queue()
        self._threads_lock = Lock()
        self._threads = set()
        self._shutdown = False

        _threadpools.add(ref(self))
        logger.info('Started thread pool with %d core threads and %s maximum '
                    'threads', core_threads, max_threads or 'unlimited')

    def _adjust_threadcount(self):
        self._threads_lock.acquire()
        try:
            if self.num_threads < self.max_threads:
                self._add_thread(self.num_threads < self.core_threads)
        finally:
            self._threads_lock.release()

    def _add_thread(self, core):
        t = Thread(target=self._run_jobs, args=(core,))
        t.setDaemon(True)
        t.start()
        self._threads.add(t)

    def _run_jobs(self, core):
        logger.debug('Started worker thread')
        block = True
        timeout = None
        if not core:
            block = self.keepalive > 0
            timeout = self.keepalive

        while True:
            try:
                func, args, kwargs = self._queue.get(block, timeout)
            except Empty:
                break

            if self._shutdown:
                break

            try:
                func(*args, **kwargs)
            except:
                logger.exception('Error in worker thread')

        self._threads_lock.acquire()
        self._threads.remove(currentThread())
        self._threads_lock.release()

        logger.debug('Exiting worker thread')

    @property
    def num_threads(self):
        return len(self._threads)

    def submit(self, func, *args, **kwargs):
        if self._shutdown:
            raise RuntimeError('Cannot schedule new tasks after shutdown')

        self._queue.put((func, args, kwargs))
        self._adjust_threadcount()

    def shutdown(self, wait=True):
        if self._shutdown:
            return

        logging.info('Shutting down thread pool')
        self._shutdown = True
        _threadpools.remove(ref(self))

        self._threads_lock.acquire()
        for _ in range(self.num_threads):
            self._queue.put((None, None, None))
        self._threads_lock.release()

        if wait:
            self._threads_lock.acquire()
            threads = tuple(self._threads)
            self._threads_lock.release()
            for thread in threads:
                thread.join()

    def __repr__(self):
        if self.max_threads:
            threadcount = '%d/%d' % (self.num_threads, self.max_threads)
        else:
            threadcount = '%d' % self.num_threads

        return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount)