This file is indexed.

/usr/share/pyshared/celery/routes.py is in python-celery 2.4.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-
"""
    celery.routes
    ~~~~~~~~~~~~~

    Contains utilities for working with task routes
    (:setting:`CELERY_ROUTES`).

    :copyright: (c) 2009 - 2011 by Ask Solem.
    :license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

from .exceptions import QueueNotFound
from .utils import firstmethod, instantiate, lpmerge, mpromise

_first_route = firstmethod("route_for_task")


class MapRoute(object):
    """Creates a router out of a :class:`dict`."""

    def __init__(self, map):
        self.map = map

    def route_for_task(self, task, *args, **kwargs):
        route = self.map.get(task)
        if route:
            return dict(route)


class Router(object):

    def __init__(self, routes=None, queues=None, create_missing=False,
            app=None):
        from .app import app_or_default
        self.app = app_or_default(app)
        self.queues = {} if queues is None else queues
        self.routes = [] if routes is None else routes
        self.create_missing = create_missing

    def route(self, options, task, args=(), kwargs={}):
        options = self.expand_destination(options)  # expands 'queue'
        if self.routes:
            route = self.lookup_route(task, args, kwargs)
            if route:  # expands 'queue' in route.
                return lpmerge(self.expand_destination(route), options)
        if "queue" not in options:
            options = lpmerge(self.expand_destination(
                                self.app.conf.CELERY_DEFAULT_QUEUE), options)
        return options

    def expand_destination(self, route):
        # Route can be a queue name: convenient for direct exchanges.
        if isinstance(route, basestring):
            queue, route = route, {}
        else:
            # can use defaults from configured queue, but override specific
            # things (like the routing_key): great for topic exchanges.
            queue = route.pop("queue", None)

        if queue:  # expand config from configured queue.
            try:
                dest = dict(self.queues[queue])
            except KeyError:
                if not self.create_missing:
                    raise QueueNotFound(
                        "Queue %r is not defined in CELERY_QUEUES" % queue)
                dest = dict(self.app.amqp.queues.add(queue, queue, queue))
            # needs to be declared by publisher
            dest["queue"] = queue
            # routing_key and binding_key are synonyms.
            dest.setdefault("routing_key", dest.get("binding_key"))
            return lpmerge(dest, route)
        return route

    def lookup_route(self, task, args=None, kwargs=None):
        return _first_route(self.routes, task, args, kwargs)


def prepare(routes):
    """Expands the :setting:`CELERY_ROUTES` setting."""

    def expand_route(route):
        if isinstance(route, dict):
            return MapRoute(route)
        if isinstance(route, basestring):
            return mpromise(instantiate, route)
        return route

    if routes is None:
        return ()
    if not isinstance(routes, (list, tuple)):
        routes = (routes, )
    return map(expand_route, routes)