/usr/lib/python2.7/dist-packages/logbook/helpers.py is in python-logbook 0.12.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 | # -*- coding: utf-8 -*-
"""
logbook.helpers
~~~~~~~~~~~~~~~
Various helper functions
:copyright: (c) 2010 by Armin Ronacher, Georg Brandl.
:license: BSD, see LICENSE for more details.
"""
import os
import re
import sys
import errno
import time
import random
from datetime import datetime, timedelta
PY2 = sys.version_info[0] == 2
if PY2:
import __builtin__ as _builtins
else:
import builtins as _builtins
try:
import json
except ImportError:
import simplejson as json
if PY2:
from cStringIO import StringIO
iteritems = dict.iteritems
from itertools import izip as zip
xrange = _builtins.xrange
else:
from io import StringIO
zip = _builtins.zip
xrange = range
iteritems = dict.items
_IDENTITY = lambda obj: obj
if PY2:
def u(s):
return unicode(s, "unicode_escape")
else:
u = _IDENTITY
if PY2:
integer_types = (int, long)
string_types = (basestring,)
else:
integer_types = (int,)
string_types = (str,)
if PY2:
import httplib as http_client
else:
from http import client as http_client
if PY2:
# Yucky, but apparently that's the only way to do this
exec("""
def reraise(tp, value, tb=None):
raise tp, value, tb
""", locals(), globals())
else:
def reraise(tp, value, tb=None):
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
# this regexp also matches incompatible dates like 20070101 because
# some libraries (like the python xmlrpclib modules) use this
_iso8601_re = re.compile(
# date
r'(\d{4})(?:-?(\d{2})(?:-?(\d{2}))?)?'
# time
r'(?:T(\d{2}):(\d{2})(?::(\d{2}(?:\.\d+)?))?(Z|[+-]\d{2}:\d{2})?)?$'
)
_missing = object()
if PY2:
def b(x):
return x
def _is_text_stream(x):
return True
else:
import io
def b(x):
return x.encode('ascii')
def _is_text_stream(stream):
return isinstance(stream, io.TextIOBase)
can_rename_open_file = False
if os.name == 'nt':
try:
import ctypes
_MOVEFILE_REPLACE_EXISTING = 0x1
_MOVEFILE_WRITE_THROUGH = 0x8
_MoveFileEx = ctypes.windll.kernel32.MoveFileExW
def _rename(src, dst):
if PY2:
if not isinstance(src, unicode):
src = unicode(src, sys.getfilesystemencoding())
if not isinstance(dst, unicode):
dst = unicode(dst, sys.getfilesystemencoding())
if _rename_atomic(src, dst):
return True
retry = 0
rv = False
while not rv and retry < 100:
rv = _MoveFileEx(src, dst, _MOVEFILE_REPLACE_EXISTING |
_MOVEFILE_WRITE_THROUGH)
if not rv:
time.sleep(0.001)
retry += 1
return rv
# new in Vista and Windows Server 2008
_CreateTransaction = ctypes.windll.ktmw32.CreateTransaction
_CommitTransaction = ctypes.windll.ktmw32.CommitTransaction
_MoveFileTransacted = ctypes.windll.kernel32.MoveFileTransactedW
_CloseHandle = ctypes.windll.kernel32.CloseHandle
can_rename_open_file = True
def _rename_atomic(src, dst):
ta = _CreateTransaction(None, 0, 0, 0, 0, 1000, 'Logbook rename')
if ta == -1:
return False
try:
retry = 0
rv = False
while not rv and retry < 100:
rv = _MoveFileTransacted(src, dst, None, None,
_MOVEFILE_REPLACE_EXISTING |
_MOVEFILE_WRITE_THROUGH, ta)
if rv:
rv = _CommitTransaction(ta)
break
else:
time.sleep(0.001)
retry += 1
return rv
finally:
_CloseHandle(ta)
except Exception:
def _rename(src, dst):
return False
def _rename_atomic(src, dst):
return False
def rename(src, dst):
# Try atomic or pseudo-atomic rename
if _rename(src, dst):
return
# Fall back to "move away and replace"
try:
os.rename(src, dst)
except OSError:
e = sys.exc_info()[1]
if e.errno != errno.EEXIST:
raise
old = "%s-%08x" % (dst, random.randint(0, sys.maxint))
os.rename(dst, old)
os.rename(src, dst)
try:
os.unlink(old)
except Exception:
pass
else:
rename = os.rename
can_rename_open_file = True
_JSON_SIMPLE_TYPES = (bool, float) + integer_types + string_types
def to_safe_json(data):
"""Makes a data structure safe for JSON silently discarding invalid
objects from nested structures. This also converts dates.
"""
def _convert(obj):
if obj is None:
return None
elif PY2 and isinstance(obj, str):
return obj.decode('utf-8', 'replace')
elif isinstance(obj, _JSON_SIMPLE_TYPES):
return obj
elif isinstance(obj, datetime):
return format_iso8601(obj)
elif isinstance(obj, list):
return [_convert(x) for x in obj]
elif isinstance(obj, tuple):
return tuple(_convert(x) for x in obj)
elif isinstance(obj, dict):
rv = {}
for key, value in iteritems(obj):
if not isinstance(key, string_types):
key = str(key)
if not is_unicode(key):
key = u(key)
rv[key] = _convert(value)
return rv
return _convert(data)
def format_iso8601(d=None):
"""Returns a date in iso8601 format."""
if d is None:
d = datetime.utcnow()
rv = d.strftime('%Y-%m-%dT%H:%M:%S')
if d.microsecond:
rv += '.' + str(d.microsecond)
return rv + 'Z'
def parse_iso8601(value):
"""Parse an iso8601 date into a datetime object. The timezone is
normalized to UTC.
"""
m = _iso8601_re.match(value)
if m is None:
raise ValueError('not a valid iso8601 date value')
groups = m.groups()
args = []
for group in groups[:-2]:
if group is not None:
group = int(group)
args.append(group)
seconds = groups[-2]
if seconds is not None:
if '.' in seconds:
sec, usec = seconds.split('.')
args.append(int(sec))
args.append(int(usec.ljust(6, '0')))
else:
args.append(int(seconds))
rv = datetime(*args)
tz = groups[-1]
if tz and tz != 'Z':
args = [int(x) for x in tz[1:].split(':')]
delta = timedelta(hours=args[0], minutes=args[1])
if tz[0] == '+':
rv -= delta
else:
rv += delta
return rv
def get_application_name():
if not sys.argv or not sys.argv[0]:
return 'Python'
return os.path.basename(sys.argv[0]).title()
class cached_property(object):
"""A property that is lazily calculated and then cached."""
def __init__(self, func, name=None, doc=None):
self.__name__ = name or func.__name__
self.__module__ = func.__module__
self.__doc__ = doc or func.__doc__
self.func = func
def __get__(self, obj, type=None):
if obj is None:
return self
value = obj.__dict__.get(self.__name__, _missing)
if value is _missing:
value = self.func(obj)
obj.__dict__[self.__name__] = value
return value
def get_iterator_next_method(it):
return lambda: next(it)
# python 2 support functions and aliases
def is_unicode(x):
if PY2:
return isinstance(x, unicode)
return isinstance(x, str)
if PY2:
exec("""def with_metaclass(meta):
class _WithMetaclassBase(object):
__metaclass__ = meta
return _WithMetaclassBase
""")
else:
exec("""def with_metaclass(meta):
class _WithMetaclassBase(object, metaclass=meta):
pass
return _WithMetaclassBase
""")
|