/usr/lib/python2.7/dist-packages/djcelery/utils.py is in python-django-celery 3.1.17-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | # -- XXX This module must not use translation as that causes
# -- a recursive loader import!
from __future__ import absolute_import, unicode_literals
from datetime import datetime
from django.conf import settings
# Database-related exceptions.
from django.db import DatabaseError
try:
import MySQLdb as mysql
_my_database_errors = (mysql.DatabaseError,
mysql.InterfaceError,
mysql.OperationalError)
except ImportError:
_my_database_errors = () # noqa
try:
import psycopg2 as pg
_pg_database_errors = (pg.DatabaseError,
pg.InterfaceError,
pg.OperationalError)
except ImportError:
_pg_database_errors = () # noqa
try:
import sqlite3
_lite_database_errors = (sqlite3.DatabaseError,
sqlite3.InterfaceError,
sqlite3.OperationalError)
except ImportError:
_lite_database_errors = () # noqa
try:
import cx_Oracle as oracle
_oracle_database_errors = (oracle.DatabaseError,
oracle.InterfaceError,
oracle.OperationalError)
except ImportError:
_oracle_database_errors = () # noqa
DATABASE_ERRORS = ((DatabaseError, ) +
_my_database_errors +
_pg_database_errors +
_lite_database_errors +
_oracle_database_errors)
try:
from django.utils import timezone
is_aware = timezone.is_aware
# see Issue #222
now_localtime = getattr(timezone, 'template_localtime', timezone.localtime)
def make_aware(value):
if getattr(settings, 'USE_TZ', False):
# naive datetimes are assumed to be in UTC.
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.utc)
# then convert to the Django configured timezone.
default_tz = timezone.get_default_timezone()
value = timezone.localtime(value, default_tz)
return value
def make_naive(value):
if getattr(settings, 'USE_TZ', False):
default_tz = timezone.get_default_timezone()
value = timezone.make_naive(value, default_tz)
return value
def now():
if getattr(settings, 'USE_TZ', False):
return now_localtime(timezone.now())
else:
return timezone.now()
except ImportError:
now = datetime.now
def _pass(x):
return x
make_aware = make_naive = _pass
def is_aware(x):
return False
def maybe_make_aware(value):
if isinstance(value, datetime) and is_aware(value):
return value
if value:
return make_aware(value)
return value
def is_database_scheduler(scheduler):
if not scheduler:
return False
from kombu.utils import symbol_by_name
from .schedulers import DatabaseScheduler
return issubclass(symbol_by_name(scheduler), DatabaseScheduler)
def fromtimestamp(value):
if getattr(settings, 'CELERY_ENABLE_UTC', False):
return datetime.utcfromtimestamp(value)
else:
return datetime.fromtimestamp(value)
|