This file is indexed.

/usr/share/pyshared/celery/concurrency/eventlet.py is in python-celery 2.4.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import os
import sys

from time import time

if not os.environ.get("EVENTLET_NOPATCH"):
    import eventlet
    import eventlet.debug
    eventlet.monkey_patch()
    eventlet.debug.hub_prevent_multiple_readers(False)

from .. import signals
from ..utils import timer2

from . import base


def apply_target(target, args=(), kwargs={}, callback=None,
                 accept_callback=None, getpid=None):
    return base.apply_target(target, args, kwargs, callback, accept_callback,
                             pid=getpid())


class Schedule(timer2.Schedule):

    def __init__(self, *args, **kwargs):
        from eventlet.greenthread import spawn_after
        from greenlet import GreenletExit
        super(Schedule, self).__init__(*args, **kwargs)

        self.GreenletExit = GreenletExit
        self._spawn_after = spawn_after
        self._queue = set()

    def enter(self, entry, eta=None, priority=0):
        try:
            eta = timer2.to_timestamp(eta)
        except OverflowError:
            if not self.handle_error(sys.exc_info()):
                raise

        now = time()
        if eta is None:
            eta = now
        secs = max(eta - now, 0)

        g = self._spawn_after(secs, entry)
        self._queue.add(g)
        g.link(self._entry_exit, entry)
        g.entry = entry
        g.eta = eta
        g.priority = priority
        g.cancelled = False

        return g

    def _entry_exit(self, g, entry):
        try:
            try:
                g.wait()
            except self.GreenletExit:
                entry.cancel()
                g.cancelled = True
        finally:
            self._queue.discard(g)

    def clear(self):
        queue = self._queue
        while queue:
            try:
                queue.pop().cancel()
            except (KeyError, self.GreenletExit):
                pass

    @property
    def queue(self):
        return [(g.eta, g.priority, g.entry) for g in self._queue]


class Timer(timer2.Timer):
    Schedule = Schedule

    def ensure_started(self):
        pass

    def stop(self):
        self.schedule.clear()

    def cancel(self, tref):
        try:
            tref.cancel()
        except self.schedule.GreenletExit:
            pass

    def start(self):
        pass


class TaskPool(base.BasePool):
    Timer = Timer

    signal_safe = False
    is_green = True

    def __init__(self, *args, **kwargs):
        from eventlet import greenthread
        from eventlet.greenpool import GreenPool
        self.Pool = GreenPool
        self.getcurrent = greenthread.getcurrent
        self.spawn_n = greenthread.spawn_n

        super(TaskPool, self).__init__(*args, **kwargs)

    def on_start(self):
        self._pool = self.Pool(self.limit)
        signals.eventlet_pool_started.send(sender=self)

    def on_stop(self):
        signals.eventlet_pool_preshutdown.send(sender=self)
        if self._pool is not None:
            self._pool.waitall()
        signals.eventlet_pool_postshutdown.send(sender=self)

    def on_apply(self, target, args=None, kwargs=None, callback=None,
            accept_callback=None, **_):
        signals.eventlet_pool_apply.send(sender=self,
                target=target, args=args, kwargs=kwargs)
        self._pool.spawn_n(apply_target, target, args, kwargs,
                           callback, accept_callback,
                           self.getcurrent)