/usr/share/pyshared/kombu/pools.py is in python-kombu 1.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | """
kombu.pools
===========
Public resource pools.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
import os
from itertools import chain
from kombu.connection import Resource
from kombu.messaging import Producer
from kombu.utils import HashingDict
__all__ = ["ProducerPool", "PoolGroup", "register_group",
"connections", "producers", "get_limit", "set_limit", "reset"]
_limit = [200]
_used = [False]
_groups = []
use_global_limit = object()
disable_limit_protection = os.environ.get("KOMBU_DISABLE_LIMIT_PROTECTION")
class ProducerPool(Resource):
def __init__(self, connections, *args, **kwargs):
self.connections = connections
super(ProducerPool, self).__init__(*args, **kwargs)
def Producer(self, connection):
return Producer(connection)
def create_producer(self):
connection = self.connections.acquire(block=True)
return self.Producer(connection)
def new(self):
return lambda: self.create_producer()
def setup(self):
if self.limit:
for _ in xrange(self.limit):
self._resource.put_nowait(self.new())
def prepare(self, p):
if callable(p):
p = p()
if not p.channel:
p.connection = self.connections.acquire(block=True)
p.revive(p.connection.default_channel)
return p
def release(self, resource):
resource.connection.release()
resource.connection = resource.channel = None
super(ProducerPool, self).release(resource)
class PoolGroup(HashingDict):
def __init__(self, limit=None):
self.limit = limit
def create(self, resource, limit):
raise NotImplementedError("PoolGroups must define ``create``")
def __missing__(self, resource):
limit = self.limit
if limit is use_global_limit:
limit = get_limit()
if not _used[0]:
_used[0] = True
k = self[resource] = self.create(resource, limit)
return k
def register_group(group):
_groups.append(group)
return group
class Connections(PoolGroup):
def create(self, connection, limit):
return connection.Pool(limit=limit)
connections = register_group(Connections(limit=use_global_limit))
class Producers(PoolGroup):
def create(self, connection, limit):
return ProducerPool(connections[connection], limit=limit)
producers = register_group(Producers(limit=use_global_limit))
def _all_pools():
return chain(*[(g.itervalues() if g else iter([])) for g in _groups])
def get_limit():
return _limit[0]
def set_limit(limit, force=False, reset_after=False):
limit = limit or 0
glimit = _limit[0] or 0
if limit or 0 < glimit:
if not disable_limit_protection and (_used[0] and not force):
raise RuntimeError("Can't lower limit after pool in use.")
reset_after = True
if limit != glimit:
_limit[0] = limit
for pool in _all_pools():
pool.limit = limit
if reset_after:
reset()
return limit
def reset(*args, **kwargs):
for pool in _all_pools():
try:
pool.force_close_all()
except Exception:
pass
for group in _groups:
group.clear()
_used[0] = False
try:
from multiprocessing.util import register_after_fork
register_after_fork(connections, reset)
except ImportError:
pass
|