/usr/share/pyshared/celery/backends/database.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | # -*- coding: utf-8 -*-
from __future__ import absolute_import
from datetime import datetime
from .. import states
from ..db.models import Task, TaskSet
from ..db.session import ResultSession
from ..exceptions import ImproperlyConfigured
from ..utils.timeutils import maybe_timedelta
from .base import BaseDictBackend
def _sqlalchemy_installed():
try:
import sqlalchemy
except ImportError:
raise ImproperlyConfigured(
"The database result backend requires SQLAlchemy to be installed."
"See http://pypi.python.org/pypi/SQLAlchemy")
return sqlalchemy
_sqlalchemy_installed()
class DatabaseBackend(BaseDictBackend):
"""The database result backend."""
# ResultSet.iterate should sleep this much between each pool,
# to not bombard the database with queries.
subpolling_interval = 0.5
def __init__(self, dburi=None, expires=None,
engine_options=None, **kwargs):
super(DatabaseBackend, self).__init__(**kwargs)
conf = self.app.conf
self.expires = maybe_timedelta(self.prepare_expires(expires))
self.dburi = dburi or conf.CELERY_RESULT_DBURI
self.engine_options = dict(engine_options or {},
**conf.CELERY_RESULT_ENGINE_OPTIONS or {})
self.short_lived_sessions = kwargs.get("short_lived_sessions",
conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS)
if not self.dburi:
raise ImproperlyConfigured(
"Missing connection string! Do you have "
"CELERY_RESULT_DBURI set to a real value?")
def ResultSession(self):
return ResultSession(
dburi=self.dburi,
short_lived_sessions=self.short_lived_sessions,
**self.engine_options)
def _store_result(self, task_id, result, status, traceback=None):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if not task:
task = Task(task_id)
session.add(task)
session.flush()
task.result = result
task.status = status
task.traceback = traceback
session.commit()
finally:
session.close()
return result
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if task is None:
task = Task(task_id)
task.status = states.PENDING
task.result = None
return task.to_dict()
finally:
session.close()
def _save_taskset(self, taskset_id, result):
"""Store the result of an executed taskset."""
session = self.ResultSession()
try:
taskset = TaskSet(taskset_id, result)
session.add(taskset)
session.flush()
session.commit()
return result
finally:
session.close()
def _restore_taskset(self, taskset_id):
"""Get metadata for taskset by id."""
session = self.ResultSession()
try:
taskset = session.query(TaskSet).filter(
TaskSet.taskset_id == taskset_id).first()
if taskset:
return taskset.to_dict()
finally:
session.close()
def _delete_taskset(self, taskset_id):
"""Delete metadata for taskset by id."""
session = self.ResultSession()
try:
session.query(TaskSet).filter(
TaskSet.taskset_id == taskset_id).delete()
session.flush()
session.commit()
finally:
session.close()
def _forget(self, task_id):
"""Forget about result."""
session = self.ResultSession()
try:
session.query(Task).filter(Task.task_id == task_id).delete()
session.commit()
finally:
session.close()
def cleanup(self):
"""Delete expired metadata."""
session = self.ResultSession()
expires = self.expires
try:
session.query(Task).filter(
Task.date_done < (datetime.now() - expires)).delete()
session.query(TaskSet).filter(
TaskSet.date_done < (datetime.now() - expires)).delete()
session.commit()
finally:
session.close()
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
dict(dburi=self.dburi,
expires=self.expires,
engine_options=self.engine_options))
return super(DatabaseBackend, self).__reduce__(args, kwargs)
|