/usr/share/pyshared/kombu/transport/beanstalk.py is in python-kombu 1.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | """
kombu.transport.beanstalk
=========================
Beanstalk transport.
:copyright: (c) 2010 - 2011 by David Ziegler.
:license: BSD, see LICENSE for more details.
"""
import socket
from Queue import Empty
from anyjson import serialize, deserialize
from beanstalkc import Connection, BeanstalkcException, SocketError
from kombu.transport import virtual
DEFAULT_PORT = 11300
__author__ = "David Ziegler <david.ziegler@gmail.com>"
class Channel(virtual.Channel):
_client = None
def _parse_job(self, job):
item, dest = None, None
if job:
try:
item = deserialize(job.body)
dest = job.stats()["tube"]
except Exception:
job.bury()
else:
job.delete()
else:
raise Empty()
return item, dest
def _put(self, queue, message, **kwargs):
priority = message["properties"]["delivery_info"]["priority"]
self.client.use(queue)
self.client.put(serialize(message), priority=priority)
def _get(self, queue):
if queue not in self.client.watching():
self.client.watch(queue)
[self.client.ignore(active)
for active in self.client.watching()
if active != queue]
job = self.client.reserve(timeout=1)
item, dest = self._parse_job(job)
return item
def _get_many(self, queues, timeout=1):
# timeout of None will cause beanstalk to timeout waiting
# for a new request
if timeout is None:
timeout = 1
watching = self.client.watching()
[self.client.watch(active)
for active in queues
if active not in watching]
job = self.client.reserve(timeout=timeout)
return self._parse_job(job)
def _purge(self, queue):
if queue not in self.client.watching():
self.client.watch(queue)
[self.client.ignore(active)
for active in self.client.watching()
if active != queue]
count = 0
while 1:
job = self.client.reserve(timeout=1)
if job:
job.delete()
count += 1
else:
break
return count
def _size(self, queue):
return 0
def _open(self):
conninfo = self.connection.client
port = conninfo.port or DEFAULT_PORT
conn = Connection(host=conninfo.hostname, port=port)
conn.connect()
return conn
def close(self):
if self._client is not None:
return self._client.close()
super(Channel, self).close()
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
Channel = Channel
interval = 1
default_port = DEFAULT_PORT
connection_errors = (socket.error,
SocketError,
IOError)
channel_errors = (socket.error,
IOError,
SocketError,
BeanstalkcException)
|