/usr/lib/python3/dist-packages/kombu/pools.py is in python3-kombu 4.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """Public resource pools."""
from __future__ import absolute_import, unicode_literals
import os
from itertools import chain
from .connection import Resource
from .five import range, values
from .messaging import Producer
from .utils.collections import EqualityDict
from .utils.compat import register_after_fork
from .utils.functional import lazy
__all__ = ['ProducerPool', 'PoolGroup', 'register_group',
'connections', 'producers', 'get_limit', 'set_limit', 'reset']
_limit = [10]
_groups = []
use_global_limit = object()
disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION')
def _after_fork_cleanup_group(group):
group.clear()
class ProducerPool(Resource):
"""Pool of :class:`kombu.Producer` instances."""
Producer = Producer
close_after_fork = True
def __init__(self, connections, *args, **kwargs):
self.connections = connections
self.Producer = kwargs.pop('Producer', None) or self.Producer
super(ProducerPool, self).__init__(*args, **kwargs)
def _acquire_connection(self):
return self.connections.acquire(block=True)
def create_producer(self):
conn = self._acquire_connection()
try:
return self.Producer(conn)
except BaseException:
conn.release()
raise
def new(self):
return lazy(self.create_producer)
def setup(self):
if self.limit:
for _ in range(self.limit):
self._resource.put_nowait(self.new())
def close_resource(self, resource):
pass
def prepare(self, p):
if callable(p):
p = p()
if p._channel is None:
conn = self._acquire_connection()
try:
p.revive(conn)
except BaseException:
conn.release()
raise
return p
def release(self, resource):
if resource.__connection__:
resource.__connection__.release()
resource.channel = None
super(ProducerPool, self).release(resource)
class PoolGroup(EqualityDict):
"""Collection of resource pools."""
def __init__(self, limit=None, close_after_fork=True):
self.limit = limit
self.close_after_fork = close_after_fork
if self.close_after_fork and register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_group)
def create(self, resource, limit):
raise NotImplementedError('PoolGroups must define ``create``')
def __missing__(self, resource):
limit = self.limit
if limit is use_global_limit:
limit = get_limit()
k = self[resource] = self.create(resource, limit)
return k
def register_group(group):
"""Register group (can be used as decorator)."""
_groups.append(group)
return group
class Connections(PoolGroup):
"""Collection of connection pools."""
def create(self, connection, limit):
return connection.Pool(limit=limit)
connections = register_group(Connections(limit=use_global_limit)) # noqa: E305
class Producers(PoolGroup):
"""Collection of producer pools."""
def create(self, connection, limit):
return ProducerPool(connections[connection], limit=limit)
producers = register_group(Producers(limit=use_global_limit)) # noqa: E305
def _all_pools():
return chain(*[(values(g) if g else iter([])) for g in _groups])
def get_limit():
"""Get current connection pool limit."""
return _limit[0]
def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
"""Set new connection pool limit."""
limit = limit or 0
glimit = _limit[0] or 0
if limit != glimit:
_limit[0] = limit
for pool in _all_pools():
pool.resize(limit)
return limit
def reset(*args, **kwargs):
"""Reset all pools by closing open resources."""
for pool in _all_pools():
try:
pool.force_close_all()
except Exception:
pass
for group in _groups:
group.clear()
|