This file is indexed.

/usr/lib/python2.7/dist-packages/kafka/queue.py is in python-kafka 0.9.3-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from __future__ import absolute_import

from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time

from kafka.client import KafkaClient, FetchRequest, ProduceRequest

log = logging.getLogger("kafka")

raise NotImplementedError("Still need to refactor this class")


class KafkaConsumerProcess(Process):
    def __init__(self, client, topic, partition, out_queue, barrier,
                 consumer_fetch_size=1024, consumer_sleep=200):
        self.client = copy(client)
        self.topic = topic
        self.partition = partition
        self.out_queue = out_queue
        self.barrier = barrier
        self.consumer_fetch_size = consumer_fetch_size
        self.consumer_sleep = consumer_sleep / 1000.
        log.info("Initializing %s" % self)
        Process.__init__(self)

    def __str__(self):
        return "[KafkaConsumerProcess: topic=%s, \
            partition=%s, sleep=%s]" % \
            (self.topic, self.partition, self.consumer_sleep)

    def run(self):
        self.barrier.wait()
        log.info("Starting %s" % self)
        fetchRequest = FetchRequest(self.topic, self.partition,
                                    offset=0, size=self.consumer_fetch_size)

        while True:
            if self.barrier.is_set() is False:
                log.info("Shutdown %s" % self)
                self.client.close()
                break

            lastOffset = fetchRequest.offset
            (messages, fetchRequest) = self.client.get_message_set(fetchRequest)

            if fetchRequest.offset == lastOffset:
                log.debug("No more data for this partition, "
                          "sleeping a bit (200ms)")
                time.sleep(self.consumer_sleep)
                continue

            for message in messages:
                self.out_queue.put(message)


class KafkaProducerProcess(Process):
    def __init__(self, client, topic, in_queue, barrier,
                 producer_flush_buffer=500,
                 producer_flush_timeout=2000,
                 producer_timeout=100):

        self.client = copy(client)
        self.topic = topic
        self.in_queue = in_queue
        self.barrier = barrier
        self.producer_flush_buffer = producer_flush_buffer
        self.producer_flush_timeout = producer_flush_timeout / 1000.
        self.producer_timeout = producer_timeout / 1000.
        log.info("Initializing %s" % self)
        Process.__init__(self)

    def __str__(self):
        return "[KafkaProducerProcess: topic=%s, \
            flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
            (self.topic,
                self.producer_flush_buffer,
                self.producer_flush_timeout,
                self.producer_timeout)

    def run(self):
        self.barrier.wait()
        log.info("Starting %s" % self)
        messages = []
        last_produce = time.time()

        def flush(messages):
            self.client.send_message_set(ProduceRequest(self.topic, -1,
                                                        messages))
            del messages[:]

        while True:
            if self.barrier.is_set() is False:
                log.info("Shutdown %s, flushing messages" % self)
                flush(messages)
                self.client.close()
                break

            if len(messages) > self.producer_flush_buffer:
                log.debug("Message count threshold reached. Flushing messages")
                flush(messages)
                last_produce = time.time()

            elif (time.time() - last_produce) > self.producer_flush_timeout:
                log.debug("Producer timeout reached. Flushing messages")
                flush(messages)
                last_produce = time.time()

            try:
                msg = KafkaClient.create_message(
                    self.in_queue.get(True, self.producer_timeout))
                messages.append(msg)

            except Empty:
                continue


class KafkaQueue(object):
    def __init__(self, client, topic, partitions,
                 producer_config=None, consumer_config=None):
        """
        KafkaQueue a Queue-like object backed by a Kafka producer and some
        number of consumers

        Messages are eagerly loaded by the consumer in batches of size
        consumer_fetch_size.
        Messages are buffered in the producer thread until
        producer_flush_timeout or producer_flush_buffer is reached.

        Arguments:
            client: KafkaClient object
            topic: str, the topic name
            partitions: list of ints, the partions to consume from
            producer_config: dict, see below
            consumer_config: dict, see below

        Consumer Config
        ===============
        consumer_fetch_size: int, number of bytes to fetch in one call
                             to Kafka. Default is 1024
        consumer_sleep: int, time in milliseconds a consumer should sleep
                        when it reaches the end of a partition. Default is 200

        Producer Config
        ===============
        producer_timeout: int, time in milliseconds a producer should
                          wait for messages to enqueue for producing.
                          Default is 100
        producer_flush_timeout: int, time in milliseconds a producer
                                should allow messages to accumulate before
                                sending to Kafka. Default is 2000
        producer_flush_buffer: int, number of messages a producer should
                               allow to accumulate. Default is 500

        """
        producer_config = {} if producer_config is None else producer_config
        consumer_config = {} if consumer_config is None else consumer_config

        self.in_queue = Queue()
        self.out_queue = Queue()
        self.consumers = []
        self.barrier = Event()

        # Initialize and start consumer threads
        for partition in partitions:
            consumer = KafkaConsumerProcess(client, topic, partition,
                                            self.in_queue, self.barrier,
                                            **consumer_config)
            consumer.start()
            self.consumers.append(consumer)

        # Initialize and start producer thread
        self.producer = KafkaProducerProcess(client, topic, self.out_queue,
                                             self.barrier, **producer_config)
        self.producer.start()

        # Trigger everything to start
        self.barrier.set()

    def get(self, block=True, timeout=None):
        """
        Consume a message from Kafka

        Arguments:
            block: boolean, default True
            timeout: int, number of seconds to wait when blocking, default None

        Returns:
            msg: str, the payload from Kafka
        """
        return self.in_queue.get(block, timeout).payload

    def put(self, msg, block=True, timeout=None):
        """
        Send a message to Kafka

        Arguments:
            msg: std, the message to send
            block: boolean, default True
            timeout: int, number of seconds to wait when blocking, default None
        """
        self.out_queue.put(msg, block, timeout)

    def close(self):
        """
        Close the internal queues and Kafka consumers/producer
        """
        self.in_queue.close()
        self.out_queue.close()
        self.barrier.clear()
        self.producer.join()
        for consumer in self.consumers:
            consumer.join()