This file is indexed.

/usr/share/pyshared/kombu/mixins.py is in python-kombu 1.4.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
"""
kombu.mixins
============

Useful mixin classes.

:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import
from __future__ import with_statement

import socket

from contextlib import nested, contextmanager
from functools import partial
from itertools import count

from kombu.messaging import Consumer

from kombu.utils import cached_property
from kombu.utils.limits import TokenBucket

__all__ = ["ConsumerMixin"]


class ConsumerMixin(object):
    connect_max_retries = None
    should_stop = False

    def get_consumers(self, Consumer, channel):
        raise NotImplementedError("Subclass responsibility")

    def on_connection_revived(self):
        pass

    def on_consume_ready(self, connection, channel):
        pass

    def on_iteration(self):
        pass

    @contextmanager
    def extra_context(self, connection, channel):
        yield

    def error(self, msg, *args):
        pass

    def info(self, msg, *args):
        pass

    def run(self):
        while not self.should_stop:
            try:
                if self.restart_limit.can_consume(1):
                    for _ in self.consume(limit=None):
                        pass
            except self.connection.connection_errors:
                self.error("Connection to broker lost. "
                           "Trying to re-establish the connection...")

    def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
        elapsed = 0
        with self.Consumer() as (connection, channel, consumers):
            with self.extra_context(connection, channel):
                self.on_consume_ready(connection, channel, **kwargs)
                for i in limit and xrange(limit) or count():
                    if self.should_stop:
                        break
                    self.on_iteration()
                    try:
                        connection.drain_events(timeout=safety_interval)
                    except socket.timeout:
                        elapsed += safety_interval
                        if timeout and elapsed >= timeout:
                            raise socket.timeout()
                    except socket.error:
                        raise
                    else:
                        yield
                        elapsed = 0

    def on_connection_error(self, exc, interval):
        self.error("Broker connection error: %r. "
                   "Trying again in %s seconds.", exc, interval)

    @contextmanager
    def Consumer(self):
        with self.connection.clone() as conn:
            conn.ensure_connection(self.on_connection_error,
                                   self.connect_max_retries)
            self.on_connection_revived()
            self.info("Connected to %s", conn.as_uri())
            channel = conn.default_channel
            with self._consume_from(*self.get_consumers(
                    partial(Consumer, channel), channel)) as consumers:
                yield conn, channel, consumers

    @contextmanager
    def _consume_from(self, *consumers):
        with nested(*consumers) as context:
            yield context

    @cached_property
    def restart_limit(self):
        # the AttributeError that can be catched from amqplib
        # poses problems for the too often restarts protection
        # in Connection.ensure_connection
        return TokenBucket(1)