This file is indexed.

/usr/lib/python2.7/dist-packages/pyres/extensions.py is in python-pyres 1.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import datetime
import time
import signal

try:
    import multiprocessing
except:
    import sys
    sys.exit("multiprocessing was not available")

from pyres import ResQ

from pyres.exceptions import NoQueueError
from pyres.worker import Worker

class JuniorWorker(Worker):
    def work(self, interval=5):
        self.startup()
        while True:
            if self._shutdown:
                break
            job = self.reserve()
            if job:
                print "got: %s" % job

                self.child = os.fork()

                if self.child:
                    print 'Forked %s at %s' % (self.child,
                                               datetime.datetime.now())
                    os.waitpid(self.child, 0)
                else:
                    print 'Processing %s since %s' % (job._queue,
                                                      datetime.datetime.now())
                    self.process(job)
                    os._exit(0)
                self.child = None
            else:
                break

        self.unregister_worker()

class Manager(object):
    def __init__(self, queues, host, max_children=10):
        self.queues = queues
        self._host = host
        self.max_children = max_children
        self._shutdown = False
        self.children = []
        self.resq = ResQ(host)
        self.validate_queues()
        self.reports = {}

    def __str__(self):
        hostname = os.uname()[1]
        pid = os.getpid()
        return 'Manager:%s:%s:%s' % (hostname, pid, ','.join(self.queues))

    def validate_queues(self):
        if not self.queues:
            raise NoQueueError("Please give each worker at least one queue.")

    def check_rising(self, queue, size):
        if queue in self.reports:
            new_time = time.time()
            old_size = self.reports[queue][0]
            old_time = self.reports[queue][1]
            if new_time > old_time + 5 and size > old_size + 20:
                return True
        else:
            self.reports[queue] = (size, time.time())
            return False

    def work(self):
        self.startup()
        while True:
            if self._shutdown:
                break
            #check to see if stuff is still going
            for queue in self.queues:
                #check queue size
                size = self.resq.size(queue)

                if self.check_rising(queue, size):
                    if len(self.children) < self.max_children:
                        self.start_child(queue)

    def startup(self):
        self.register_manager()
        self.register_signals()

    def register_manager(self):
        self.resq.redis.sadd('managers', str(self))

    def unregister_manager(self):
        self.resq.redis.srem('managers', str(self))

    def register_signals(self):
        signal.signal(signal.SIGTERM, self.shutdown_all)
        signal.signal(signal.SIGINT, self.shutdown_all)
        signal.signal(signal.SIGQUIT, self.schedule_shutdown)
        signal.signal(signal.SIGUSR1, self.kill_children)

    def shutdown_all(self, signum, frame):
        self.schedule_shutdown(signum, frame)
        self.kill_children(signum, frame)

    def schedule_shutdown(self, signum, frame):
        self._shutdown = True

    def kill_children(self):
        for child in self.children:
            child.terminate()

    def start_child(self, queue):
        p = multiprocessing.Process(target=JuniorWorker.run, args=([queue],
                                                                   self._host))
        self.children.append(p)
        p.start()
        return True

    @classmethod
    def run(cls, queues=(), host="localhost:6379"):
        manager = cls(queues, host)
        manager.work()