This file is indexed.

/usr/share/pyshared/celery/contrib/batches.py is in python-celery 2.5.3-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# -*- coding: utf-8 -*-
"""
celery.contrib.batches
======================

Collect messages and processes them as a list.

**Example**

A click counter that flushes the buffer every 100 messages, and every
10 seconds.

.. code-block:: python

    from celery.task import task
    from celery.contrib.batches import Batches

    # Flush after 100 messages, or 10 seconds.
    @task(base=Batches, flush_every=100, flush_interval=10)
    def count_click(requests):
        from collections import Counter
        count = Counter(request.kwargs["url"] for request in requests)
        for url, count in count.items():
            print(">>> Clicks: %s -> %s" % (url, count))

Registering the click is done as follows:

    >>> count_click.delay(url="http://example.com")

.. warning::

    For this to work you have to set
    :setting:`CELERYD_PREFETCH_MULTIPLIER` to zero, or some value where
    the final multiplied value is higher than ``flush_every``.

    In the future we hope to add the ability to direct batching tasks
    to a channel with different QoS requirements than the task channel.

:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

from itertools import count
from Queue import Empty, Queue

from celery.task import Task
from celery.utils import cached_property, timer2
from celery.worker import state


def consume_queue(queue):
    """Iterator yielding all immediately available items in a
    :class:`Queue.Queue`.

    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.

    *Examples*

        >>> q = Queue()
        >>> map(q.put, range(4))
        >>> list(consume_queue(q))
        [0, 1, 2, 3]
        >>> list(consume_queue(q))
        []

    """
    get = queue.get_nowait
    while 1:
        try:
            yield get()
        except Empty:
            break


def apply_batches_task(task, args, loglevel, logfile):
    task.request.update({"loglevel": loglevel, "logfile": logfile})
    try:
        result = task(*args)
    except Exception, exp:
        result = None
        task.logger.error("There was an Exception: %s", exp, exc_info=True)
    finally:
        task.request.clear()
    return result


class SimpleRequest(object):
    """Pickleable request."""

    #: task id
    id = None

    #: task name
    name = None

    #: positional arguments
    args = ()

    #: keyword arguments
    kwargs = {}

    #: message delivery information.
    delivery_info = None

    #: worker node name
    hostname = None

    def __init__(self, id, name, args, kwargs, delivery_info, hostname):
        self.id = id
        self.name = name
        self.args = args
        self.kwargs = kwargs
        self.delivery_info = delivery_info
        self.hostname = hostname

    @classmethod
    def from_request(cls, request):
        return cls(request.task_id, request.task_name, request.args,
                   request.kwargs, request.delivery_info, request.hostname)


class Batches(Task):
    abstract = True

    #: Maximum number of message in buffer.
    flush_every = 10

    #: Timeout in seconds before buffer is flushed anyway.
    flush_interval = 30

    def __init__(self):
        self._buffer = Queue()
        self._count = count(1).next
        self._tref = None
        self._pool = None
        self._logging = None

    def run(self, requests):
        raise NotImplementedError("%r must implement run(requests)" % (self, ))

    def flush(self, requests):
        return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
                                                for r in requests], ))

    def execute(self, request, pool, loglevel, logfile):
        if not self._pool:         # just take pool from first task.
            self._pool = pool
        if not self._logging:
            self._logging = loglevel, logfile

        state.task_ready(request)  # immediately remove from worker state.
        self._buffer.put(request)

        if self._tref is None:     # first request starts flush timer.
            self._tref = timer2.apply_interval(self.flush_interval * 1000,
                                               self._do_flush)

        if not self._count() % self.flush_every:
            self._do_flush()

    def _do_flush(self):
        self.debug("Wake-up to flush buffer...")
        requests = None
        if self._buffer.qsize():
            requests = list(consume_queue(self._buffer))
            if requests:
                self.debug("Buffer complete: %s", len(requests))
                self.flush(requests)
        if not requests:
            self.debug("Cancelling timer: Nothing in buffer.")
            self._tref.cancel()  # cancel timer.
            self._tref = None

    def apply_buffer(self, requests, args=(), kwargs={}):
        acks_late = [], []
        [acks_late[r.task.acks_late].append(r) for r in requests]
        assert requests and (acks_late[True] or acks_late[False])

        def on_accepted(pid, time_accepted):
            [req.acknowledge() for req in acks_late[False]]

        def on_return(result):
            [req.acknowledge() for req in acks_late[True]]

        loglevel, logfile = self._logging
        return self._pool.apply_async(apply_batches_task,
                    (self, args, loglevel, logfile),
                    accept_callback=on_accepted,
                    callback=acks_late[True] and on_return or None)

    def debug(self, msg):
        self.logger.debug("%s: %s", self.name, msg)

    @cached_property
    def logger(self):
        return self.app.log.get_default_logger()