This file is indexed.

/usr/share/pyshared/kombu/transport/pycouchdb.py is in python-kombu 1.4.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
kombu.transport.pycouchdb
=========================

CouchDB transport.

:copyright: (c) 2010 - 2011 by David Clymer.
:license: BSD, see LICENSE for more details.

"""
from Queue import Empty

import socket
import couchdb

from anyjson import serialize, deserialize

from kombu.transport import virtual
from kombu.utils import uuid4

DEFAULT_PORT = 5984
DEFAULT_DATABASE = "kombu_default"

__author__ = "David Clymer <david@zettazebra.com>"


def create_message_view(db):
    from couchdb import design

    view = design.ViewDefinition("kombu", "messages", """
        function (doc) {
          if (doc.queue && doc.payload)
            emit(doc.queue, doc);
        }
        """)
    if not view.get_doc(db):
        view.sync(db)


class Channel(virtual.Channel):
    _client = None

    view_created = False

    def _put(self, queue, message, **kwargs):
        self.client.save({'_id': uuid4().hex,
                          'queue': queue,
                          'payload': serialize(message)})

    def _get(self, queue):
        result = self._query(queue, limit=1)
        if not result:
            raise Empty()

        item = result.rows[0].value
        self.client.delete(item)
        return deserialize(item["payload"])

    def _purge(self, queue):
        result = self._query(queue)
        for item in result:
            self.client.delete(item.value)
        return len(result)

    def _size(self, queue):
        return len(self._query(queue))

    def _open(self):
        conninfo = self.connection.client
        dbname = conninfo.virtual_host
        proto = conninfo.ssl and "https" or "http"
        if not dbname or dbname == "/":
            dbname = DEFAULT_DATABASE
        port = conninfo.port or DEFAULT_PORT
        server = couchdb.Server('%s://%s:%s/' % (proto,
                                                 conninfo.hostname,
                                                 port))
        # Use username and password if avaliable
        try:
            server.resource.credentials = (conninfo.userid, conninfo.password)
        except AttributeError:
            pass
        try:
            return server[dbname]
        except couchdb.http.ResourceNotFound:
            return server.create(dbname)

    def _query(self, queue, **kwargs):
        if not self.view_created:
            # if the message view is not yet set up, we'll need it now.
            create_message_view(self.client)
            self.view_created = True
        return self.client.view("kombu/messages", key=queue, **kwargs)

    @property
    def client(self):
        if self._client is None:
            self._client = self._open()
        return self._client


class Transport(virtual.Transport):
    Channel = Channel

    interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (socket.error,
                         couchdb.HTTPError,
                         couchdb.ServerError,
                         couchdb.Unauthorized)
    channel_errors = (couchdb.HTTPError,
                      couchdb.ServerError,
                      couchdb.PreconditionFailed,
                      couchdb.ResourceConflict,
                      couchdb.ResourceNotFound)