This file is indexed.

/usr/lib/python2.7/dist-packages/pika/callback.py is in python-pika 0.10.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
"""Callback management class, common area for keeping track of all callbacks in
the Pika stack.

"""
import functools
import logging

from pika import frame
from pika import amqp_object
from pika.compat import xrange, canonical_str

LOGGER = logging.getLogger(__name__)


def name_or_value(value):
    """Will take Frame objects, classes, etc and attempt to return a valid
    string identifier for them.

    :param value: The value to sanitize
    :type value:  pika.amqp_object.AMQPObject|pika.frame.Frame|int|unicode|str
    :rtype: str

    """
    # Is it subclass of AMQPObject
    try:
        if issubclass(value, amqp_object.AMQPObject):
            return value.NAME
    except TypeError:
        pass

    # Is it a Pika frame object?
    if isinstance(value, frame.Method):
        return value.method.NAME

    # Is it a Pika frame object (go after Method since Method extends this)
    if isinstance(value, amqp_object.AMQPObject):
        return value.NAME

    # Cast the value to a str (python 2 and python 3); encoding as UTF-8 on Python 2
    return canonical_str(value)


def sanitize_prefix(function):
    """Automatically call name_or_value on the prefix passed in."""

    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        args = list(args)
        offset = 1
        if 'prefix' in kwargs:
            kwargs['prefix'] = name_or_value(kwargs['prefix'])
        elif len(args) - 1 >= offset:
            args[offset] = name_or_value(args[offset])
            offset += 1
        if 'key' in kwargs:
            kwargs['key'] = name_or_value(kwargs['key'])
        elif len(args) - 1 >= offset:
            args[offset] = name_or_value(args[offset])

        return function(*tuple(args), **kwargs)

    return wrapper


def check_for_prefix_and_key(function):
    """Automatically return false if the key or prefix is not in the callbacks
    for the instance.

    """

    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        offset = 1
        # Sanitize the prefix
        if 'prefix' in kwargs:
            prefix = name_or_value(kwargs['prefix'])
        else:
            prefix = name_or_value(args[offset])
            offset += 1

        # Make sure to sanitize the key as well
        if 'key' in kwargs:
            key = name_or_value(kwargs['key'])
        else:
            key = name_or_value(args[offset])

        # Make sure prefix and key are in the stack
        if prefix not in args[0]._stack or key not in args[0]._stack[prefix]:
            return False

        # Execute the method
        return function(*args, **kwargs)

    return wrapper


class CallbackManager(object):
    """CallbackManager is a global callback system designed to be a single place
    where Pika can manage callbacks and process them. It should be referenced
    by the CallbackManager.instance() method instead of constructing new
    instances of it.

    """
    CALLS = 'calls'
    ARGUMENTS = 'arguments'
    DUPLICATE_WARNING = 'Duplicate callback found for "%s:%s"'
    CALLBACK = 'callback'
    ONE_SHOT = 'one_shot'
    ONLY_CALLER = 'only'

    def __init__(self):
        """Create an instance of the CallbackManager"""
        self._stack = dict()

    @sanitize_prefix
    def add(self, prefix, key, callback,
            one_shot=True,
            only_caller=None,
            arguments=None):
        """Add a callback to the stack for the specified key. If the call is
        specified as one_shot, it will be removed after being fired

        The prefix is usually the channel number but the class is generic
        and prefix and key may be any value. If you pass in only_caller
        CallbackManager will restrict processing of the callback to only
        the calling function/object that you specify.

        :param prefix: Categorize the callback
        :type prefix: str or int
        :param key: The key for the callback
        :type key: object or str or dict
        :param method callback: The callback to call
        :param bool one_shot: Remove this callback after it is called
        :param object only_caller: Only allow one_caller value to call the
                                   event that fires the callback.
        :param dict arguments: Arguments to validate when processing
        :rtype: tuple(prefix, key)

        """
        # Prep the stack
        if prefix not in self._stack:
            self._stack[prefix] = dict()

        if key not in self._stack[prefix]:
            self._stack[prefix][key] = list()

        # Check for a duplicate
        for callback_dict in self._stack[prefix][key]:
            if (callback_dict[self.CALLBACK] == callback and
                callback_dict[self.ARGUMENTS] == arguments and
                callback_dict[self.ONLY_CALLER] == only_caller):
                if callback_dict[self.ONE_SHOT] is True:
                    callback_dict[self.CALLS] += 1
                    LOGGER.debug('Incremented callback reference counter: %r',
                                 callback_dict)
                else:
                    LOGGER.warning(self.DUPLICATE_WARNING, prefix, key)
                return prefix, key

        # Create the callback dictionary
        callback_dict = self._callback_dict(callback, one_shot, only_caller,
                                            arguments)
        self._stack[prefix][key].append(callback_dict)
        LOGGER.debug('Added: %r', callback_dict)
        return prefix, key

    def clear(self):
        """Clear all the callbacks if there are any defined."""
        self._stack = dict()
        LOGGER.debug('Callbacks cleared')

    @sanitize_prefix
    def cleanup(self, prefix):
        """Remove all callbacks from the stack by a prefix. Returns True
        if keys were there to be removed

        :param str or int prefix: The prefix for keeping track of callbacks with
        :rtype: bool

        """
        LOGGER.debug('Clearing out %r from the stack', prefix)
        if prefix not in self._stack or not self._stack[prefix]:
            return False
        del self._stack[prefix]
        return True

    @sanitize_prefix
    def pending(self, prefix, key):
        """Return count of callbacks for a given prefix or key or None

        :param prefix: Categorize the callback
        :type prefix: str or int
        :param key: The key for the callback
        :type key: object or str or dict
        :rtype: None or int

        """
        if not prefix in self._stack or not key in self._stack[prefix]:
            return None
        return len(self._stack[prefix][key])

    @sanitize_prefix
    @check_for_prefix_and_key
    def process(self, prefix, key, caller, *args, **keywords):
        """Run through and process all the callbacks for the specified keys.
        Caller should be specified at all times so that callbacks which
        require a specific function to call CallbackManager.process will
        not be processed.

        :param prefix: Categorize the callback
        :type prefix: str or int
        :param key: The key for the callback
        :type key: object or str or dict
        :param object caller: Who is firing the event
        :param list args: Any optional arguments
        :param dict keywords: Optional keyword arguments
        :rtype: bool

        """
        LOGGER.debug('Processing %s:%s', prefix, key)
        if prefix not in self._stack or key not in self._stack[prefix]:
            return False

        callbacks = list()
        # Check each callback, append it to the list if it should be called
        for callback_dict in list(self._stack[prefix][key]):
            if self._should_process_callback(callback_dict, caller, list(args)):
                callbacks.append(callback_dict[self.CALLBACK])
                if callback_dict[self.ONE_SHOT]:
                    self._use_one_shot_callback(prefix, key, callback_dict)

        # Call each callback
        for callback in callbacks:
            LOGGER.debug('Calling %s for "%s:%s"', callback, prefix, key)
            try:
                callback(*args, **keywords)
            except:
                LOGGER.exception('Calling %s for "%s:%s" failed', callback,
                                 prefix, key)
                raise
        return True

    @sanitize_prefix
    @check_for_prefix_and_key
    def remove(self, prefix, key, callback_value=None, arguments=None):
        """Remove a callback from the stack by prefix, key and optionally
        the callback itself. If you only pass in prefix and key, all
        callbacks for that prefix and key will be removed.

        :param str or int prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key
        :param method callback_value: The method defined to call on callback
        :param dict arguments: Optional arguments to check
        :rtype: bool

        """
        if callback_value:
            offsets_to_remove = list()
            for offset in xrange(len(self._stack[prefix][key]), 0, -1):
                callback_dict = self._stack[prefix][key][offset - 1]

                if (callback_dict[self.CALLBACK] == callback_value and
                    self._arguments_match(callback_dict, [arguments])):
                    offsets_to_remove.append(offset - 1)

            for offset in offsets_to_remove:
                try:
                    LOGGER.debug('Removing callback #%i: %r', offset,
                                 self._stack[prefix][key][offset])
                    del self._stack[prefix][key][offset]
                except KeyError:
                    pass

        self._cleanup_callback_dict(prefix, key)
        return True

    @sanitize_prefix
    @check_for_prefix_and_key
    def remove_all(self, prefix, key):
        """Remove all callbacks for the specified prefix and key.

        :param str prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key

        """
        del self._stack[prefix][key]
        self._cleanup_callback_dict(prefix, key)

    def _arguments_match(self, callback_dict, args):
        """Validate if the arguments passed in match the expected arguments in
        the callback_dict. We expect this to be a frame passed in to *args for
        process or passed in as a list from remove.

        :param dict callback_dict: The callback dictionary to evaluate against
        :param list args: The arguments passed in as a list

        """
        if callback_dict[self.ARGUMENTS] is None:
            return True
        if not args:
            return False
        if isinstance(args[0], dict):
            return self._dict_arguments_match(args[0],
                                              callback_dict[self.ARGUMENTS])
        return self._obj_arguments_match(args[0].method
                                         if hasattr(args[0], 'method') else
                                         args[0], callback_dict[self.ARGUMENTS])

    def _callback_dict(self, callback, one_shot, only_caller, arguments):
        """Return the callback dictionary.

        :param method callback: The callback to call
        :param bool one_shot: Remove this callback after it is called
        :param object only_caller: Only allow one_caller value to call the
                                   event that fires the callback.
        :rtype: dict

        """
        value = {
            self.CALLBACK: callback,
            self.ONE_SHOT: one_shot,
            self.ONLY_CALLER: only_caller,
            self.ARGUMENTS: arguments
        }
        if one_shot:
            value[self.CALLS] = 1
        return value

    def _cleanup_callback_dict(self, prefix, key=None):
        """Remove empty dict nodes in the callback stack.

        :param str or int prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key

        """
        if key and key in self._stack[prefix] and not self._stack[prefix][key]:
            del self._stack[prefix][key]
        if prefix in self._stack and not self._stack[prefix]:
            del self._stack[prefix]

    @staticmethod
    def _dict_arguments_match(value, expectation):
        """Checks an dict to see if it has attributes that meet the expectation.

        :param dict value: The dict to evaluate
        :param dict expectation: The values to check against
        :rtype: bool

        """
        LOGGER.debug('Comparing %r to %r', value, expectation)
        for key in expectation:
            if value.get(key) != expectation[key]:
                LOGGER.debug('Values in dict do not match for %s', key)
                return False
        return True

    @staticmethod
    def _obj_arguments_match(value, expectation):
        """Checks an object to see if it has attributes that meet the
        expectation.

        :param object value: The object to evaluate
        :param dict expectation: The values to check against
        :rtype: bool

        """
        for key in expectation:
            if not hasattr(value, key):
                LOGGER.debug('%r does not have required attribute: %s',
                             type(value), key)
                return False
            if getattr(value, key) != expectation[key]:
                LOGGER.debug('Values in %s do not match for %s', type(value),
                             key)
                return False
        return True

    def _should_process_callback(self, callback_dict, caller, args):
        """Returns True if the callback should be processed.

        :param dict callback_dict: The callback configuration
        :param object caller: Who is firing the event
        :param list args: Any optional arguments
        :rtype: bool

        """
        if not self._arguments_match(callback_dict, args):
            LOGGER.debug('Arguments do not match for %r, %r', callback_dict,
                         args)
            return False
        return (callback_dict[self.ONLY_CALLER] is None or
                (callback_dict[self.ONLY_CALLER] and
                 callback_dict[self.ONLY_CALLER] == caller))

    def _use_one_shot_callback(self, prefix, key, callback_dict):
        """Process the one-shot callback, decrementing the use counter and
        removing it from the stack if it's now been fully used.

        :param str or int prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key
        :param dict callback_dict: The callback dict to process

        """
        LOGGER.debug('Processing use of oneshot callback')
        callback_dict[self.CALLS] -= 1
        LOGGER.debug('%i registered uses left', callback_dict[self.CALLS])

        if callback_dict[self.CALLS] <= 0:
            self.remove(prefix, key, callback_dict[self.CALLBACK],
                        callback_dict[self.ARGUMENTS])