/usr/share/pyshared/celery/routes.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | # -*- coding: utf-8 -*-
"""
celery.routes
~~~~~~~~~~~~~
Contains utilities for working with task routes
(:setting:`CELERY_ROUTES`).
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from .exceptions import QueueNotFound
from .utils import firstmethod, instantiate, lpmerge, mpromise
_first_route = firstmethod("route_for_task")
class MapRoute(object):
"""Creates a router out of a :class:`dict`."""
def __init__(self, map):
self.map = map
def route_for_task(self, task, *args, **kwargs):
route = self.map.get(task)
if route:
return dict(route)
class Router(object):
def __init__(self, routes=None, queues=None, create_missing=False,
app=None):
from .app import app_or_default
self.app = app_or_default(app)
self.queues = {} if queues is None else queues
self.routes = [] if routes is None else routes
self.create_missing = create_missing
def route(self, options, task, args=(), kwargs={}):
options = self.expand_destination(options) # expands 'queue'
if self.routes:
route = self.lookup_route(task, args, kwargs)
if route: # expands 'queue' in route.
return lpmerge(self.expand_destination(route), options)
if "queue" not in options:
options = lpmerge(self.expand_destination(
self.app.conf.CELERY_DEFAULT_QUEUE), options)
return options
def expand_destination(self, route):
# Route can be a queue name: convenient for direct exchanges.
if isinstance(route, basestring):
queue, route = route, {}
else:
# can use defaults from configured queue, but override specific
# things (like the routing_key): great for topic exchanges.
queue = route.pop("queue", None)
if queue: # expand config from configured queue.
try:
dest = dict(self.queues[queue])
except KeyError:
if not self.create_missing:
raise QueueNotFound(
"Queue %r is not defined in CELERY_QUEUES" % queue)
dest = dict(self.app.amqp.queues.add(queue, queue, queue))
# needs to be declared by publisher
dest["queue"] = queue
# routing_key and binding_key are synonyms.
dest.setdefault("routing_key", dest.get("binding_key"))
return lpmerge(dest, route)
return route
def lookup_route(self, task, args=None, kwargs=None):
return _first_route(self.routes, task, args, kwargs)
def prepare(routes):
"""Expands the :setting:`CELERY_ROUTES` setting."""
def expand_route(route):
if isinstance(route, dict):
return MapRoute(route)
if isinstance(route, basestring):
return mpromise(instantiate, route)
return route
if routes is None:
return ()
if not isinstance(routes, (list, tuple)):
routes = (routes, )
return map(expand_route, routes)
|