This file is indexed.

/usr/lib/python2.7/dist-packages/Pyro4/tpjobqueue.py is in python2-pyro4 4.23-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
Thread pooled job queue that can grow and shrink its pool of worker threads.

Pyro - Python Remote Objects.  Copyright by Irmen de Jong (irmen@razorvine.net).
"""

from __future__ import with_statement
import logging
import weakref
import time
import Pyro4.threadutil
try:
    import queue
except ImportError:
    import Queue as queue


log=logging.getLogger("Pyro4.tpjobqueue")


class NoJobAvailableError(queue.Empty):
    pass

class JobQueueError(Exception):
    pass


class Worker(Pyro4.threadutil.Thread):
    """
    Worker thread that picks jobs from the job queue and executes them.
    If it encounters None as a job, it will stop running, regardless of the pool size.
    If it encounters a lack of jobs for a short period, it will
    attempt to stop running as well in an effort to shrink the thread pool.
    """
    def __init__(self, pool):
        super(Worker, self).__init__()
        self.daemon = True
        self.pool = weakref.ref(pool)
        self.name = "Pyro-Worker-%d " % id(self)
        self.job = None  # the active job

    def run(self):
        while True:
            pool = self.pool()
            if not pool:
                break   # pool's gone, better exit
            try:
                self.job = pool.getJob()
            except NoJobAvailableError:
                # attempt to halt the worker, if the pool size permits this
                if pool.attemptHalt(self):
                    break
                else:
                    continue
            if self.job is None:
                # halt the worker, regardless of the pool size
                pool.halted(self)
                break
            else:
                pool.setBusy(self)
                try:
                    self.job()
                    pool.setIdle(self)
                except:
                    pool.halted(self, True)
                    raise



class ThreadPooledJobQueue(object):
    """
    A job queue that is serviced by a pool of worker threads that grows or
    shrings as demanded by the work load, between limits set by the
    THREADPOOL_MINTHREADS and THREADPOOL_MAXTHREADS config items.
    """
    def __init__(self):
        self.lock = Pyro4.threadutil.Lock()
        self.idle = set()
        self.busy = set()
        self.jobs = queue.Queue()
        self.closed = False
        for _ in range(Pyro4.config.THREADPOOL_MINTHREADS):
            self.__spawnIdle()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        """Close down the thread pool, signaling to all remaining worker threads to shut down."""
        count = self.workercountSafe
        for _ in range(count):
            self.jobs.put(None)  # None as a job means: terminate the worker
        log.debug("closing down, %d halt-jobs issued", count)
        self.closed = True

    def drain(self):
        """Wait till the job queue has been emptied."""
        if not self.closed:
            raise JobQueueError("can't drain a job queue that hasn't been closed yet")
        while not self.jobs.empty() and self.workercountSafe:
            # note: this loop may never end if a worker's job remains busy or blocked,
            # we simply assume all workers eventually terminate their jobs...
            time.sleep(0.1)
        time.sleep(0.05)
        if self.workercountSafe > 0:
            raise JobQueueError("there are still active workers")

    @property
    def workercount(self):
        return len(self.idle) + len(self.busy)

    @property
    def workercountSafe(self):
        with self.lock:
            return len(self.idle) + len(self.busy)

    @property
    def jobcount(self):
        return self.jobs.qsize()

    def __repr__(self):
        return "<%s.%s at 0x%x, %d idle, %d busy, %d jobs>" % \
            (self.__class__.__module__, self.__class__.__name__, id(self), len(self.idle), len(self.busy), self.jobcount)

    def process(self, job):
        """
        Add the job to the general job queue. Job is any callable object.
        If there's no idle worker available to service it, a new one is spawned
        as long as the pool size permits it.
        """
        with self.lock:
            if self.closed:
                raise JobQueueError("job queue is closed")
            self.jobs.put(job)
            if self.jobcount > 0:
                if not self.idle:
                    self.__spawnIdle()
                spawnamount = self.jobcount
                while spawnamount > 1:
                    self.__spawnIdle()
                    spawnamount -= 1

    def setIdle(self, worker):
        with self.lock:
            self.busy.remove(worker)
            self.idle.add(worker)

    def setBusy(self, worker):
        with self.lock:
            self.idle.remove(worker)
            self.busy.add(worker)

    def halted(self, worker, crashed=False):
        """Called by a worker when it halts (exits). This removes the worker from the bookkeeping."""
        with self.lock:
            self.__halted(worker)

    def __halted(self, worker):
        # Lock-free version that is used internally
        if worker in self.idle:
            self.idle.remove(worker)
        if worker in self.busy:
            self.busy.remove(worker)
        log.debug("worker halted: %s", worker.name)

    def attemptHalt(self, worker):
        """
        Called by a worker to signal it intends to halt.
        Returns true or false depending on whether the worker was actually allowed to halt.
        """
        with self.lock:
            if self.workercount > Pyro4.config.THREADPOOL_MINTHREADS:
                self.__halted(worker)
                return True
            return False

    def getJob(self):
        """
        Called by a worker to obtain a new job from the queue.
        If there's no job available in the timeout period given by the
        THREADPOOL_IDLETIMEOUT config item, NoJobAvailableError is raised.
        """
        if self.closed:
            return None
        try:
            return self.jobs.get(timeout=Pyro4.config.THREADPOOL_IDLETIMEOUT)
        except queue.Empty:
            raise NoJobAvailableError("queue is empty")

    def __spawnIdle(self):
        """
        Spawn a new idle worker if there is still room in the pool.
        (must only be called with self.lock acquired)
        """
        if self.workercount >= Pyro4.config.THREADPOOL_MAXTHREADS:
            return
        worker = Worker(self)
        self.idle.add(worker)
        log.debug("spawned new idle worker: %s", worker.name)
        worker.start()