This file is indexed.

/usr/share/pyshared/kombu/transport/librabbitmq.py is in python-kombu 1.4.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
kombu.transport.librabbitmq
===========================

pylibrabbitmq transport.

:copyright: (c) 2010 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
import socket
import pylibrabbitmq as amqp

from pylibrabbitmq import ChannelError, ConnectionError

from kombu.transport import base

DEFAULT_PORT = 5672


class Message(base.Message):
    """A message received by the broker.

    .. attribute:: body

        The message body.

    .. attribute:: delivery_tag

        The message delivery tag, uniquely identifying this message.

    .. attribute:: channel

        The channel instance the message was received on.

    """

    def __init__(self, channel, message, **kwargs):
        props = message.properties
        info = message.delivery_info
        super(Message, self).__init__(channel,
                body=message.body,
                delivery_info=info,
                properties=props,
                delivery_tag=info["delivery_tag"],
                content_type=props["content_type"],
                content_encoding=props["content_encoding"],
                headers=props.get("application_headers"),
                **kwargs)


class Channel(amqp.Channel, base.StdChannel):
    Message = Message

    def prepare_message(self, body, priority=None,
                content_type=None, content_encoding=None, headers=None,
                properties=None):
        """Encapsulate data into a AMQP message."""
        properties = dict({"content_type": content_type,
                           "content_encoding": content_encoding,
                           "application_headers": headers,
                           "priority": priority}, **properties or {})
        return amqp.Message(body, properties=properties)

    def message_to_python(self, raw_message):
        """Convert encoded message body back to a Python value."""
        return self.Message(self, raw_message)


class Connection(amqp.Connection):
    Channel = Channel


class Transport(base.Transport):
    Connection = Connection

    default_port = DEFAULT_PORT
    connection_errors = (ConnectionError,
                         socket.error,
                         IOError,
                         OSError)
    channel_errors = (ChannelError, )

    def __init__(self, client, **kwargs):
        self.client = client
        self.default_port = kwargs.get("default_port") or self.default_port

    def create_channel(self, connection):
        return connection.channel()

    def drain_events(self, connection, **kwargs):
        return connection.drain_events(**kwargs)

    def establish_connection(self):
        """Establish connection to the AMQP broker."""
        conninfo = self.client
        for name, default_value in self.default_connection_params.items():
            if not getattr(conninfo, name, None):
                setattr(conninfo, name, default_value)
        conn = self.Connection(host=conninfo.host,
                               userid=conninfo.userid,
                               password=conninfo.password,
                               virtual_host=conninfo.virtual_host,
                               login_method=conninfo.login_method,
                               insist=conninfo.insist,
                               ssl=conninfo.ssl,
                               connect_timeout=conninfo.connect_timeout)
        conn.client = self.client
        return conn

    def close_connection(self, connection):
        """Close the AMQP broker connection."""
        connection.close()

    @property
    def default_connection_params(self):
        return {"userid": "guest", "password": "guest",
                "port": self.default_port,
                "hostname": "localhost", "login_method": "AMQPLAIN"}