This file is indexed.

/usr/lib/python2.7/dist-packages/celery/contrib/testing/app.py is in python-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
"""Create Celery app instances used for testing."""
from __future__ import absolute_import, unicode_literals
import weakref
from contextlib import contextmanager
from copy import deepcopy
from kombu.utils.imports import symbol_by_name
from celery import Celery
from celery import _state

#: Contains the default configuration values for the test app.
DEFAULT_TEST_CONFIG = {
    'worker_hijack_root_logger': False,
    'worker_log_color': False,
    'accept_content': {'json'},
    'enable_utc': True,
    'timezone': 'UTC',
    'broker_url': 'memory://',
    'result_backend': 'cache+memory://',
    'broker_heartbeat': 0,
}


class Trap(object):
    """Trap that pretends to be an app but raises an exception instead.

    This to protect from code that does not properly pass app instances,
    then falls back to the current_app.
    """

    def __getattr__(self, name):
        raise RuntimeError('Test depends on current_app')


class UnitLogging(symbol_by_name(Celery.log_cls)):
    """Sets up logging for the test application."""

    def __init__(self, *args, **kwargs):
        super(UnitLogging, self).__init__(*args, **kwargs)
        self.already_setup = True


def TestApp(name=None, config=None, enable_logging=False, set_as_current=False,
            log=UnitLogging, backend=None, broker=None, **kwargs):
    """App used for testing."""
    from . import tasks  # noqa
    config = dict(deepcopy(DEFAULT_TEST_CONFIG), **config or {})
    if broker is not None:
        config.pop('broker_url', None)
    if backend is not None:
        config.pop('result_backend', None)
    log = None if enable_logging else log
    test_app = Celery(
        name or 'celery.tests',
        set_as_current=set_as_current,
        log=log,
        broker=broker,
        backend=backend,
        **kwargs)
    test_app.add_defaults(config)
    return test_app


@contextmanager
def set_trap(app):
    """Contextmanager that installs the trap app.

    The trap means that anything trying to use the current or default app
    will raise an exception.
    """
    trap = Trap()
    prev_tls = _state._tls
    _state.set_default_app(trap)

    class NonTLS(object):
        current_app = trap
    _state._tls = NonTLS()

    yield
    _state._tls = prev_tls


@contextmanager
def setup_default_app(app, use_trap=False):
    """Setup default app for testing.

    Ensures state is clean after the test returns.
    """
    prev_current_app = _state.get_current_app()
    prev_default_app = _state.default_app
    prev_finalizers = set(_state._on_app_finalizers)
    prev_apps = weakref.WeakSet(_state._apps)

    if use_trap:
        with set_trap(app):
            yield
    else:
        yield

    _state.set_default_app(prev_default_app)
    _state._tls.current_app = prev_current_app
    if app is not prev_current_app:
        app.close()
    _state._on_app_finalizers = prev_finalizers
    _state._apps = prev_apps