/usr/share/pyshared/kombu/transport/librabbitmq.py is in python-kombu 1.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | """
kombu.transport.librabbitmq
===========================
pylibrabbitmq transport.
:copyright: (c) 2010 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
import socket
import pylibrabbitmq as amqp
from pylibrabbitmq import ChannelError, ConnectionError
from kombu.transport import base
DEFAULT_PORT = 5672
class Message(base.Message):
"""A message received by the broker.
.. attribute:: body
The message body.
.. attribute:: delivery_tag
The message delivery tag, uniquely identifying this message.
.. attribute:: channel
The channel instance the message was received on.
"""
def __init__(self, channel, message, **kwargs):
props = message.properties
info = message.delivery_info
super(Message, self).__init__(channel,
body=message.body,
delivery_info=info,
properties=props,
delivery_tag=info["delivery_tag"],
content_type=props["content_type"],
content_encoding=props["content_encoding"],
headers=props.get("application_headers"),
**kwargs)
class Channel(amqp.Channel, base.StdChannel):
Message = Message
def prepare_message(self, body, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
properties = dict({"content_type": content_type,
"content_encoding": content_encoding,
"application_headers": headers,
"priority": priority}, **properties or {})
return amqp.Message(body, properties=properties)
def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
return self.Message(self, raw_message)
class Connection(amqp.Connection):
Channel = Channel
class Transport(base.Transport):
Connection = Connection
default_port = DEFAULT_PORT
connection_errors = (ConnectionError,
socket.error,
IOError,
OSError)
channel_errors = (ChannelError, )
def __init__(self, client, **kwargs):
self.client = client
self.default_port = kwargs.get("default_port") or self.default_port
def create_channel(self, connection):
return connection.channel()
def drain_events(self, connection, **kwargs):
return connection.drain_events(**kwargs)
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.client
for name, default_value in self.default_connection_params.items():
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
virtual_host=conninfo.virtual_host,
login_method=conninfo.login_method,
insist=conninfo.insist,
ssl=conninfo.ssl,
connect_timeout=conninfo.connect_timeout)
conn.client = self.client
return conn
def close_connection(self, connection):
"""Close the AMQP broker connection."""
connection.close()
@property
def default_connection_params(self):
return {"userid": "guest", "password": "guest",
"port": self.default_port,
"hostname": "localhost", "login_method": "AMQPLAIN"}
|