/usr/lib/python2.7/dist-packages/djcelery/schedulers.py is in python-django-celery 3.1.17-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | from __future__ import absolute_import
import logging
from multiprocessing.util import Finalize
from anyjson import loads, dumps
from celery import current_app
from celery import schedules
from celery.beat import Scheduler, ScheduleEntry
from celery.utils.encoding import safe_str, safe_repr
from celery.utils.log import get_logger
from celery.utils.timeutils import is_naive
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from .db import commit_on_success
from .models import (PeriodicTask, PeriodicTasks,
CrontabSchedule, IntervalSchedule)
from .utils import DATABASE_ERRORS, make_aware
from .compat import itervalues
# This scheduler must wake up more frequently than the
# regular of 5 minutes because it needs to take external
# changes to the schedule into account.
DEFAULT_MAX_INTERVAL = 5 # seconds
ADD_ENTRY_ERROR = """\
Couldn't add entry %r to database schedule: %r. Contents: %r
"""
logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error
class ModelEntry(ScheduleEntry):
model_schedules = ((schedules.crontab, CrontabSchedule, 'crontab'),
(schedules.schedule, IntervalSchedule, 'interval'))
save_fields = ['last_run_at', 'total_run_count', 'no_changes']
def __init__(self, model):
self.app = current_app._get_current_object()
self.name = model.name
self.task = model.task
try:
self.schedule = model.schedule
except model.DoesNotExist:
logger.error('Schedule was removed from database')
logger.warning('Disabling %s', self.name)
self._disable(model)
try:
self.args = loads(model.args or '[]')
self.kwargs = loads(model.kwargs or '{}')
except ValueError:
logging.error('Failed to serialize arguments for %s.', self.name,
exc_info=1)
logging.warning('Disabling %s', self.name)
self._disable(model)
self.options = {'queue': model.queue,
'exchange': model.exchange,
'routing_key': model.routing_key,
'expires': model.expires}
self.total_run_count = model.total_run_count
self.model = model
if not model.last_run_at:
model.last_run_at = self._default_now()
orig = self.last_run_at = model.last_run_at
if not is_naive(self.last_run_at):
self.last_run_at = self.last_run_at.replace(tzinfo=None)
assert orig.hour == self.last_run_at.hour # timezone sanity
def _disable(self, model):
model.no_changes = True
model.enabled = False
model.save()
def is_due(self):
if not self.model.enabled:
return False, 5.0 # 5 second delay for re-enable.
return self.schedule.is_due(self.last_run_at)
def _default_now(self):
return self.app.now()
def __next__(self):
self.model.last_run_at = self.app.now()
self.model.total_run_count += 1
self.model.no_changes = True
return self.__class__(self.model)
next = __next__ # for 2to3
def save(self):
# Object may not be synchronized, so only
# change the fields we care about.
obj = self.model._default_manager.get(pk=self.model.pk)
for field in self.save_fields:
setattr(obj, field, getattr(self.model, field))
obj.last_run_at = make_aware(obj.last_run_at)
obj.save()
@classmethod
def to_model_schedule(cls, schedule):
for schedule_type, model_type, model_field in cls.model_schedules:
schedule = schedules.maybe_schedule(schedule)
if isinstance(schedule, schedule_type):
model_schedule = model_type.from_schedule(schedule)
model_schedule.save()
return model_schedule, model_field
raise ValueError(
'Cannot convert schedule type {0!r} to model'.format(schedule))
@classmethod
def from_entry(cls, name, skip_fields=('relative', 'options'), **entry):
options = entry.get('options') or {}
fields = dict(entry)
for skip_field in skip_fields:
fields.pop(skip_field, None)
schedule = fields.pop('schedule')
model_schedule, model_field = cls.to_model_schedule(schedule)
fields[model_field] = model_schedule
fields['args'] = dumps(fields.get('args') or [])
fields['kwargs'] = dumps(fields.get('kwargs') or {})
fields['queue'] = options.get('queue')
fields['exchange'] = options.get('exchange')
fields['routing_key'] = options.get('routing_key')
return cls(PeriodicTask._default_manager.update_or_create(
name=name, defaults=fields,
))
def __repr__(self):
return '<ModelEntry: {0} {1}(*{2}, **{3}) {{4}}>'.format(
safe_str(self.name), self.task, safe_repr(self.args),
safe_repr(self.kwargs), self.schedule,
)
class DatabaseScheduler(Scheduler):
Entry = ModelEntry
Model = PeriodicTask
Changes = PeriodicTasks
_schedule = None
_last_timestamp = None
_initial_read = False
def __init__(self, *args, **kwargs):
self._dirty = set()
self._finalize = Finalize(self, self.sync, exitpriority=5)
Scheduler.__init__(self, *args, **kwargs)
self.max_interval = (
kwargs.get('max_interval') or
self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
DEFAULT_MAX_INTERVAL)
def setup_schedule(self):
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
def all_as_schedule(self):
debug('DatabaseScheduler: Fetching database schedule')
s = {}
for model in self.Model.objects.enabled():
try:
s[model.name] = self.Entry(model)
except ValueError:
pass
return s
def schedule_changed(self):
try:
# If MySQL is running with transaction isolation level
# REPEATABLE-READ (default), then we won't see changes done by
# other transactions until the current transaction is
# committed (Issue #41).
try:
transaction.commit()
except transaction.TransactionManagementError:
pass # not in transaction management.
last, ts = self._last_timestamp, self.Changes.last_change()
except DATABASE_ERRORS as exc:
error('Database gave error: %r', exc, exc_info=1)
return False
try:
if ts and ts > (last if last else ts):
return True
finally:
self._last_timestamp = ts
return False
def reserve(self, entry):
new_entry = Scheduler.reserve(self, entry)
# Need to store entry by name, because the entry may change
# in the mean time.
self._dirty.add(new_entry.name)
return new_entry
def sync(self):
info('Writing entries...')
_tried = set()
try:
with commit_on_success():
while self._dirty:
try:
name = self._dirty.pop()
_tried.add(name)
self.schedule[name].save()
except (KeyError, ObjectDoesNotExist):
pass
except DATABASE_ERRORS as exc:
# retry later
self._dirty |= _tried
error('Database error while sync: %r', exc, exc_info=1)
def update_from_dict(self, dict_):
s = {}
for name, entry in dict_.items():
try:
s[name] = self.Entry.from_entry(name, **entry)
except Exception as exc:
error(ADD_ENTRY_ERROR, name, exc, entry)
self.schedule.update(s)
def install_default_entries(self, data):
entries = {}
if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expires': 12 * 3600},
},
)
self.update_from_dict(entries)
@property
def schedule(self):
update = False
if not self._initial_read:
debug('DatabaseScheduler: intial read')
update = True
self._initial_read = True
elif self.schedule_changed():
info('DatabaseScheduler: Schedule changed.')
update = True
if update:
self.sync()
self._schedule = self.all_as_schedule()
if logger.isEnabledFor(logging.DEBUG):
debug('Current schedule:\n%s', '\n'.join(
repr(entry) for entry in itervalues(self._schedule)),
)
return self._schedule
|