/usr/share/pyshared/celery/concurrency/eventlet.py is in python-celery 2.5.3-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | # -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
if not os.environ.get("EVENTLET_NOPATCH"):
import eventlet
import eventlet.debug
eventlet.monkey_patch()
eventlet.debug.hub_prevent_multiple_readers(False)
import sys
from time import time
from .. import signals
from ..utils import timer2
from . import base
def apply_target(target, args=(), kwargs={}, callback=None,
accept_callback=None, getpid=None):
return base.apply_target(target, args, kwargs, callback, accept_callback,
pid=getpid())
class Schedule(timer2.Schedule):
def __init__(self, *args, **kwargs):
from eventlet.greenthread import spawn_after
from greenlet import GreenletExit
super(Schedule, self).__init__(*args, **kwargs)
self.GreenletExit = GreenletExit
self._spawn_after = spawn_after
self._queue = set()
def enter(self, entry, eta=None, priority=0):
try:
eta = timer2.to_timestamp(eta)
except OverflowError:
if not self.handle_error(sys.exc_info()):
raise
now = time()
if eta is None:
eta = now
secs = max(eta - now, 0)
g = self._spawn_after(secs, entry)
self._queue.add(g)
g.link(self._entry_exit, entry)
g.entry = entry
g.eta = eta
g.priority = priority
g.cancelled = False
return g
def _entry_exit(self, g, entry):
try:
try:
g.wait()
except self.GreenletExit:
entry.cancel()
g.cancelled = True
finally:
self._queue.discard(g)
def clear(self):
queue = self._queue
while queue:
try:
queue.pop().cancel()
except (KeyError, self.GreenletExit):
pass
@property
def queue(self):
return [(g.eta, g.priority, g.entry) for g in self._queue]
class Timer(timer2.Timer):
Schedule = Schedule
def ensure_started(self):
pass
def stop(self):
self.schedule.clear()
def cancel(self, tref):
try:
tref.cancel()
except self.schedule.GreenletExit:
pass
def start(self):
pass
class TaskPool(base.BasePool):
Timer = Timer
rlimit_safe = False
signal_safe = False
is_green = True
def __init__(self, *args, **kwargs):
from eventlet import greenthread
from eventlet.greenpool import GreenPool
self.Pool = GreenPool
self.getcurrent = greenthread.getcurrent
self.getpid = lambda: id(greenthread.getcurrent())
self.spawn_n = greenthread.spawn_n
super(TaskPool, self).__init__(*args, **kwargs)
def on_start(self):
self._pool = self.Pool(self.limit)
signals.eventlet_pool_started.send(sender=self)
def on_stop(self):
signals.eventlet_pool_preshutdown.send(sender=self)
if self._pool is not None:
self._pool.waitall()
signals.eventlet_pool_postshutdown.send(sender=self)
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
signals.eventlet_pool_apply.send(sender=self,
target=target, args=args, kwargs=kwargs)
self._pool.spawn_n(apply_target, target, args, kwargs,
callback, accept_callback,
self.getpid)
|