This file is indexed.

/usr/lib/python2.7/dist-packages/celery/fixups/django.py is in python-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""Django-specific customization."""
from __future__ import absolute_import, unicode_literals

import os
import sys
import warnings

from kombu.utils.imports import symbol_by_name
from kombu.utils.objects import cached_property

from datetime import datetime
from importlib import import_module

from celery import _state
from celery import signals
from celery.exceptions import FixupWarning, ImproperlyConfigured

__all__ = ['DjangoFixup', 'fixup']

ERR_NOT_INSTALLED = """\
Environment variable DJANGO_SETTINGS_MODULE is defined
but Django isn't installed.  Won't apply Django fix-ups!
"""


def _maybe_close_fd(fh):
    try:
        os.close(fh.fileno())
    except (AttributeError, OSError, TypeError):
        # TypeError added for celery#962
        pass


def _verify_django_version(django):
    if django.VERSION < (1, 8):
        raise ImproperlyConfigured('Celery 4.x requires Django 1.8 or later.')


def fixup(app, env='DJANGO_SETTINGS_MODULE'):
    """Install Django fixup if settings module environment is set."""
    SETTINGS_MODULE = os.environ.get(env)
    if SETTINGS_MODULE and 'django' not in app.loader_cls.lower():
        try:
            import django  # noqa
        except ImportError:
            warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
        else:
            _verify_django_version(django)
            return DjangoFixup(app).install()


class DjangoFixup(object):
    """Fixup installed when using Django."""

    def __init__(self, app):
        self.app = app
        if _state.default_app is None:
            self.app.set_default()
        self._worker_fixup = None

    def install(self):
        # Need to add project directory to path
        sys.path.append(os.getcwd())

        self._settings = symbol_by_name('django.conf:settings')
        self.app.loader.now = self.now

        signals.import_modules.connect(self.on_import_modules)
        signals.worker_init.connect(self.on_worker_init)
        return self

    @property
    def worker_fixup(self):
        if self._worker_fixup is None:
            self._worker_fixup = DjangoWorkerFixup(self.app)
        return self._worker_fixup

    @worker_fixup.setter
    def worker_fixup(self, value):
        self._worker_fixup = value

    def on_import_modules(self, **kwargs):
        # call django.setup() before task modules are imported
        self.worker_fixup.validate_models()

    def on_worker_init(self, **kwargs):
        self.worker_fixup.install()

    def now(self, utc=False):
        return datetime.utcnow() if utc else self._now()

    def autodiscover_tasks(self):
        from django.apps import apps
        return [config.name for config in apps.get_app_configs()]

    @cached_property
    def _now(self):
        return symbol_by_name('django.utils.timezone:now')


class DjangoWorkerFixup(object):
    _db_recycles = 0

    def __init__(self, app):
        self.app = app
        self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
        self._db = import_module('django.db')
        self._cache = import_module('django.core.cache')
        self._settings = symbol_by_name('django.conf:settings')

        self.interface_errors = (
            symbol_by_name('django.db.utils.InterfaceError'),
        )
        self.DatabaseError = symbol_by_name('django.db:DatabaseError')

    def django_setup(self):
        import django
        django.setup()

    def validate_models(self):
        from django.core.checks import run_checks
        self.django_setup()
        run_checks()

    def install(self):
        signals.beat_embedded_init.connect(self.close_database)
        signals.worker_ready.connect(self.on_worker_ready)
        signals.task_prerun.connect(self.on_task_prerun)
        signals.task_postrun.connect(self.on_task_postrun)
        signals.worker_process_init.connect(self.on_worker_process_init)
        self.close_database()
        self.close_cache()
        return self

    def on_worker_process_init(self, **kwargs):
        # Child process must validate models again if on Windows,
        # or if they were started using execv.
        if os.environ.get('FORKED_BY_MULTIPROCESSING'):
            self.validate_models()

        # close connections:
        # the parent process may have established these,
        # so need to close them.

        # calling db.close() on some DB connections will cause
        # the inherited DB conn to also get broken in the parent
        # process so we need to remove it without triggering any
        # network IO that close() might cause.
        for c in self._db.connections.all():
            if c and c.connection:
                self._maybe_close_db_fd(c.connection)

        # use the _ version to avoid DB_REUSE preventing the conn.close() call
        self._close_database()
        self.close_cache()

    def _maybe_close_db_fd(self, fd):
        try:
            _maybe_close_fd(fd)
        except self.interface_errors:
            pass

    def on_task_prerun(self, sender, **kwargs):
        """Called before every task."""
        if not getattr(sender.request, 'is_eager', False):
            self.close_database()

    def on_task_postrun(self, sender, **kwargs):
        # See https://groups.google.com/group/django-users/
        #            browse_thread/thread/78200863d0c07c6d/
        if not getattr(sender.request, 'is_eager', False):
            self.close_database()
            self.close_cache()

    def close_database(self, **kwargs):
        if not self.db_reuse_max:
            return self._close_database()
        if self._db_recycles >= self.db_reuse_max * 2:
            self._db_recycles = 0
            self._close_database()
        self._db_recycles += 1

    def _close_database(self):
        for conn in self._db.connections.all():
            try:
                conn.close()
            except self.interface_errors:
                pass
            except self.DatabaseError as exc:
                str_exc = str(exc)
                if 'closed' not in str_exc and 'not connected' not in str_exc:
                    raise

    def close_cache(self):
        try:
            self._cache.cache.close()
        except (TypeError, AttributeError):
            pass

    def on_worker_ready(self, **kwargs):
        if self._settings.DEBUG:
            warnings.warn('Using settings.DEBUG leads to a memory leak, never '
                          'use this setting in production environments!')