/usr/share/pyshared/melange/openstack/common/utils.py is in python-melange 1:2012.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import datetime
import logging
import os
import random
import shlex
import sys
from eventlet import greenthread
from eventlet.green import subprocess
import iso8601
from melange.openstack.common import exception
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
LOG = logging.getLogger(__name__)
def int_from_bool_as_string(subject):
"""
Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject):
"""
Interpret a string as a boolean.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
if isinstance(subject, bool):
return subject
if isinstance(subject, basestring):
if subject.strip().lower() in ('true', 'on', '1'):
return True
return False
def execute(*cmd, **kwargs):
"""
Helper method to execute command with optional retry.
:cmd Passed to subprocess.Popen.
:process_input Send to opened process.
:check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
unless program exits with this code.
:delay_on_retry True | False. Defaults to True. If set to True, wait a
short amount of time before retrying.
:attempts How many times to retry cmd.
:run_as_root True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:root_helper command to prefix all cmd's with
:raises exception.Error on receiving unknown arguments
:raises exception.ProcessExecutionError
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
raise exception.Error(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=True)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise exception.ProcessExecutionError(
exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except exception.ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
def import_class(import_str):
"""Returns a class from a string including module and class"""
mod_str, _sep, class_str = import_str.rpartition('.')
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError):
raise exception.NotFound('Class %s cannot be found' % class_str)
def import_object(import_str):
"""Returns an object including a module or module and class"""
try:
__import__(import_str)
return sys.modules[import_str]
except ImportError:
return import_class(import_str)
def isotime(at=None):
"""Stringify time in ISO 8601 format"""
if not at:
at = datetime.datetime.utcnow()
str = at.strftime(TIME_FORMAT)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
def parse_isotime(timestr):
"""Parse time from ISO 8601 format"""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC"""
offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
utcnow.override_time = override_time
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
|