/usr/share/pyshared/kombu/transport/pycouchdb.py is in python-kombu 1.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | """
kombu.transport.pycouchdb
=========================
CouchDB transport.
:copyright: (c) 2010 - 2011 by David Clymer.
:license: BSD, see LICENSE for more details.
"""
from Queue import Empty
import socket
import couchdb
from anyjson import serialize, deserialize
from kombu.transport import virtual
from kombu.utils import uuid4
DEFAULT_PORT = 5984
DEFAULT_DATABASE = "kombu_default"
__author__ = "David Clymer <david@zettazebra.com>"
def create_message_view(db):
from couchdb import design
view = design.ViewDefinition("kombu", "messages", """
function (doc) {
if (doc.queue && doc.payload)
emit(doc.queue, doc);
}
""")
if not view.get_doc(db):
view.sync(db)
class Channel(virtual.Channel):
_client = None
view_created = False
def _put(self, queue, message, **kwargs):
self.client.save({'_id': uuid4().hex,
'queue': queue,
'payload': serialize(message)})
def _get(self, queue):
result = self._query(queue, limit=1)
if not result:
raise Empty()
item = result.rows[0].value
self.client.delete(item)
return deserialize(item["payload"])
def _purge(self, queue):
result = self._query(queue)
for item in result:
self.client.delete(item.value)
return len(result)
def _size(self, queue):
return len(self._query(queue))
def _open(self):
conninfo = self.connection.client
dbname = conninfo.virtual_host
proto = conninfo.ssl and "https" or "http"
if not dbname or dbname == "/":
dbname = DEFAULT_DATABASE
port = conninfo.port or DEFAULT_PORT
server = couchdb.Server('%s://%s:%s/' % (proto,
conninfo.hostname,
port))
# Use username and password if avaliable
try:
server.resource.credentials = (conninfo.userid, conninfo.password)
except AttributeError:
pass
try:
return server[dbname]
except couchdb.http.ResourceNotFound:
return server.create(dbname)
def _query(self, queue, **kwargs):
if not self.view_created:
# if the message view is not yet set up, we'll need it now.
create_message_view(self.client)
self.view_created = True
return self.client.view("kombu/messages", key=queue, **kwargs)
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
Channel = Channel
interval = 1
default_port = DEFAULT_PORT
connection_errors = (socket.error,
couchdb.HTTPError,
couchdb.ServerError,
couchdb.Unauthorized)
channel_errors = (couchdb.HTTPError,
couchdb.ServerError,
couchdb.PreconditionFailed,
couchdb.ResourceConflict,
couchdb.ResourceNotFound)
|