This file is indexed.

/usr/share/pyshared/celery/backends/cache.py is in python-celery 2.5.3-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: utf-8 -*-
from __future__ import absolute_import

from ..datastructures import LRUCache
from ..exceptions import ImproperlyConfigured
from ..utils import cached_property

from .base import KeyValueStoreBackend

_imp = [None]


def import_best_memcache():
    if _imp[0] is None:
        is_pylibmc = False
        try:
            import pylibmc as memcache
            is_pylibmc = True
        except ImportError:
            try:
                import memcache  # noqa
            except ImportError:
                raise ImproperlyConfigured(
                        "Memcached backend requires either the 'pylibmc' "
                        "or 'memcache' library")
        _imp[0] = (is_pylibmc, memcache)
    return _imp[0]


def get_best_memcache(*args, **kwargs):
    behaviors = kwargs.pop("behaviors", None)
    is_pylibmc, memcache = import_best_memcache()
    client = memcache.Client(*args, **kwargs)
    if is_pylibmc and behaviors is not None:
        client.behaviors = behaviors
    return client


class DummyClient(object):

    def __init__(self, *args, **kwargs):
        self.cache = LRUCache(limit=5000)

    def get(self, key, *args, **kwargs):
        return self.cache.get(key)

    def get_multi(self, keys):
        cache = self.cache
        return dict((k, cache[k]) for k in keys if k in cache)

    def set(self, key, value, *args, **kwargs):
        self.cache[key] = value

    def delete(self, key, *args, **kwargs):
        self.cache.pop(key, None)

    def incr(self, key, delta=1):
        return self.cache.incr(key, delta)


backends = {"memcache": lambda: get_best_memcache,
            "memcached": lambda: get_best_memcache,
            "pylibmc": lambda: get_best_memcache,
            "memory": lambda: DummyClient}


class CacheBackend(KeyValueStoreBackend):
    servers = None
    supports_native_join = True

    def __init__(self, expires=None, backend=None, options={}, **kwargs):
        super(CacheBackend, self).__init__(self, **kwargs)

        self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
                            **options)

        self.backend = backend or self.app.conf.CELERY_CACHE_BACKEND
        if self.backend:
            self.backend, _, servers = self.backend.partition("://")
            self.servers = servers.rstrip('/').split(";")
        self.expires = self.prepare_expires(expires, type=int)
        try:
            self.Client = backends[self.backend]()
        except KeyError:
            raise ImproperlyConfigured(
                    "Unknown cache backend: %s. Please use one of the "
                    "following backends: %s" % (self.backend,
                                                ", ".join(backends.keys())))

    def get(self, key):
        return self.client.get(key)

    def mget(self, keys):
        return self.client.get_multi(keys)

    def set(self, key, value):
        return self.client.set(key, value, self.expires)

    def delete(self, key):
        return self.client.delete(key)

    def on_chord_apply(self, setid, body, result=None, **kwargs):
        key = self.get_key_for_chord(setid)
        self.client.set(key, '0', time=86400)

    def on_chord_part_return(self, task, propagate=False):
        from ..task.sets import subtask
        from ..result import TaskSetResult
        setid = task.request.taskset
        if not setid:
            return
        key = self.get_key_for_chord(setid)
        deps = TaskSetResult.restore(setid, backend=task.backend)
        if self.client.incr(key) >= deps.total:
            subtask(task.request.chord).delay(deps.join(propagate=propagate))
            deps.delete()
            self.client.delete(key)

    @cached_property
    def client(self):
        return self.Client(self.servers, **self.options)

    def __reduce__(self, args=(), kwargs={}):
        servers = ";".join(self.servers)
        backend = "%s://%s/" % (self.backend, servers)
        kwargs.update(
            dict(backend=backend,
                 expires=self.expires,
                 options=self.options))
        return super(CacheBackend, self).__reduce__(args, kwargs)