/usr/share/pyshared/celery/utils/encoding.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | # -*- coding: utf-8 -*-
"""
celery.utils.encoding
~~~~~~~~~~~~~~~~~~~~~
Utilities to encode text, and to safely emit text from running
applications without crashing with the infamous :exc:`UnicodeDecodeError`
exception.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import sys
import traceback
is_py3k = sys.version_info >= (3, 0)
if is_py3k:
def str_to_bytes(s):
if isinstance(s, str):
return s.encode()
return s
def bytes_to_str(s):
if isinstance(s, bytes):
return s.decode()
return s
def from_utf8(s, *args, **kwargs):
return s
def ensure_bytes(s):
if not isinstance(s, bytes):
return str_to_bytes(s)
return s
str_t = str
bytes_t = bytes
else:
def str_to_bytes(s): # noqa
if isinstance(s, unicode):
return s.encode()
return s
def bytes_to_str(s): # noqa
return s
def from_utf8(s, *args, **kwargs): # noqa
return s.encode("utf-8", *args, **kwargs)
str_t = unicode
bytes_t = str
ensure_bytes = str_to_bytes
if sys.platform.startswith("java"):
def default_encoding():
return "utf-8"
else:
def default_encoding(): # noqa
return sys.getfilesystemencoding()
def safe_str(s, errors="replace"):
s = bytes_to_str(s)
if not isinstance(s, basestring):
return safe_repr(s, errors)
return _safe_str(s, errors)
def _safe_str(s, errors="replace"):
if is_py3k:
return s
encoding = default_encoding()
try:
if isinstance(s, unicode):
return s.encode(encoding, errors)
return unicode(s, encoding, errors)
except Exception, exc:
return "<Unrepresentable %r: %r %r>" % (
type(s), exc, "\n".join(traceback.format_stack()))
def safe_repr(o, errors="replace"):
try:
return repr(o)
except Exception:
return _safe_str(o, errors)
|