/usr/share/pyshared/celery/contrib/migrate.py is in python-celery 2.5.3-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | # -*- coding: utf-8 -*-
"""
celery.contrib.migrate
~~~~~~~~~~~~~~~~~~~~~~
Migration tools.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from __future__ import with_statement
import socket
from functools import partial
from kombu.common import eventloop
from kombu.exceptions import StdChannelError
from kombu.utils.encoding import ensure_bytes
from celery.app import app_or_default
class State(object):
count = 0
total_apx = 0
@property
def strtotal(self):
if not self.total_apx:
return u"?"
return unicode(self.total_apx)
def migrate_task(producer, body_, message,
remove_props=["application_headers",
"content_type",
"content_encoding"]):
body = ensure_bytes(message.body) # use raw message body.
info, headers, props = (message.delivery_info,
message.headers,
message.properties)
ctype, enc = message.content_type, message.content_encoding
# remove compression header, as this will be inserted again
# when the message is recompressed.
compression = headers.pop("compression", None)
for key in remove_props:
props.pop(key, None)
producer.publish(ensure_bytes(body), exchange=info["exchange"],
routing_key=info["routing_key"],
compression=compression,
headers=headers,
content_type=ctype,
content_encoding=enc,
**props)
def migrate_tasks(source, dest, timeout=1.0, app=None,
migrate=None, callback=None):
state = State()
app = app_or_default(app)
def update_state(body, message):
state.count += 1
producer = app.amqp.TaskPublisher(dest)
if migrate is None:
migrate = partial(migrate_task, producer)
if callback is not None:
callback = partial(callback, state)
consumer = app.amqp.get_task_consumer(source)
consumer.register_callback(update_state)
consumer.register_callback(callback)
consumer.register_callback(migrate)
# declare all queues on the new broker.
for queue in consumer.queues:
queue(producer.channel).declare()
try:
_, mcount, _ = queue(consumer.channel).queue_declare(passive=True)
if mcount:
state.total_apx += mcount
except source.channel_errors + (StdChannelError, ):
pass
# start migrating messages.
with consumer:
try:
for _ in eventloop(source, timeout=timeout):
pass
except socket.timeout:
return
|