/usr/share/pyshared/txzookeeper/retry.py is in python-txzookeeper 0.9.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | #
# Copyright (C) 2011 Canonical Ltd. All Rights Reserved
#
# This file is part of txzookeeper.
#
# Authors:
# Kapil Thangavelu
#
# txzookeeper is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# txzookeeper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
#
"""
A retry client facade that transparently handles transient connection
errors.
"""
import time
import logging
import zookeeper
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from txzookeeper.client import NotConnectedException
from txzookeeper.utils import sleep
__all__ = ["retry", "RetryClient"]
log = logging.getLogger("txzk.retry")
class RetryException(Exception):
"""Explicit retry exception.
"""
# Session timeout percentage that we should wait till retrying.
RETRY_FRACTION = 30
def is_retryable(e):
"""Determine if an exception signifies a recoverable connection error.
"""
return isinstance(
e,
(zookeeper.ClosingException,
zookeeper.ConnectionLossException,
zookeeper.OperationTimeoutException,
RetryException))
def is_session_error(e):
return isinstance(
e,
(zookeeper.SessionExpiredException,
zookeeper.ConnectionLossException,
zookeeper.ClosingException,
NotConnectedException))
def _args(args):
return args and args[0] or "NA"
def get_delay(session_timeout, max_delay=5, session_fraction=RETRY_FRACTION):
"""Get retry delay between retrying an operation.
Returns either the specified fraction of a session timeout or the
max delay, whichever is smaller.
The goal is to allow the connection time to auto-heal, before
retrying an operation.
:param session_timeout: The timeout for the session, in milliseconds
:param max_delay: The max delay for a retry, in seconds.
:param session_fraction: The fractional amount of a timeout to wait
"""
retry_delay = (session_timeout * (float(session_fraction) / 100)) / 1000
return min(retry_delay, max_delay)
def check_error(e):
"""Verify a zookeeper connection error, as opposed to an app error.
"""
return is_retryable(e) or is_session_error(e)
def check_retryable(retry_client, max_time, error):
"""Check an error and a client to see if an operation is retryable.
:param retry_client: A txzookeeper client
:param max_time: The max time (epoch tick) that the op is retryable till.
:param error: The client operation exception.
"""
t = time.time()
# Only if the error is known.
if not is_retryable(error):
return False
# Only if we've haven't exceeded the max allotted time.
if max_time <= t:
return False
# Only if the client hasn't been explicitly closed.
if not retry_client.connected:
return False
# Only if the client is in a recoverable state.
if retry_client.unrecoverable:
return False
return True
@inlineCallbacks
def retry(client, func, *args, **kw):
"""Constructs a retry wrapper around a function that retries invocations.
If the function execution results in an exception due to a transient
connection error, the retry wrapper will reinvoke the operation after
a suitable delay (fractional value of the session timeout).
:param client: A ZookeeperClient instance.
:param func: A callable python object that interacts with
zookeeper, the callable must utilize the same zookeeper
connection as passed in the `client` param. The function
must return a single value (either a deferred or result
value).
"""
retry_started = [time.time()]
retry_error = False
while 1:
try:
value = yield func(*args, **kw)
except Exception, e:
# For clients which aren't connected (session timeout == None)
# we raise the errors to the callers.
session_timeout = client.session_timeout or 0
# The longest we keep retrying is 1.2 * session timeout
max_time = (session_timeout / 1000.0) * 1.2 + retry_started[0]
if not check_retryable(client, max_time, e):
# Check if its a persistent client error, and if so use the cb
# if present to try and reconnect for client errors.
if (check_error(e)
and time.time() > max_time
and callable(client.cb_retry_error)
and not retry_error):
log.debug("Retry error %r on %s @ %s",
e, func.__name__, _args(args))
retry_error = True
yield client.cb_retry_error(e)
retry_started[0] = time.time()
continue
raise
# Give the connection a chance to auto-heal.
yield sleep(get_delay(session_timeout))
log.debug("Retry on %s @ %s", func.__name__, _args(args))
continue
returnValue(value)
def retry_watch(client, func, *args, **kw):
"""Contructs a wrapper around a watch callable that retries invocations.
If the callable execution results in an exception due to a transient
connection error, the retry wrapper will reinvoke the operation after
a suitable delay (fractional value of the session timeout).
A watch function must return back a tuple of deferreds
(value_deferred, watch_deferred). No inline callbacks are
performed in here to ensure that callers continue to see a
tuple of results.
The client passed to this retry function must be the same as
the one utilized by the python callable.
:param client: A ZookeeperClient instance.
:param func: A python callable that interacts with zookeeper. If a
function is passed, a txzookeeper client must the first
parameter of this function. The function must return a
tuple of (value_deferred, watch_deferred)
"""
# For clients which aren't connected (session timeout == None)
# we raise the usage errors to the callers
session_timeout = client.session_timeout or 0
# If we keep retrying past the 1.2 * session timeout without
# success just die, the session expiry is fatal.
max_time = session_timeout * 1.2 + time.time()
value_d, watch_d = func(*args, **kw)
def retry_delay(f):
"""Errback, verifes an op is retryable, and delays the next retry.
"""
# Check that operation is retryable.
if not check_retryable(client, max_time, f.value):
return f
# Give the connection a chance to auto-heal
d = sleep(get_delay(session_timeout))
d.addCallback(retry_inner)
return d
def retry_inner(value):
"""Retry operation invoker.
"""
# Invoke the function
retry_value_d, retry_watch_d = func(*args, **kw)
# If we need to retry again.
retry_value_d.addErrback(retry_delay)
# Chain the new watch deferred to the old, presuming its doa
# if the value deferred errored on a connection error.
retry_watch_d.chainDeferred(watch_d)
# Insert back into the callback chain.
return retry_value_d
# Attach the retry
value_d.addErrback(retry_delay)
return value_d, watch_d
def _passproperty(name):
"""Returns a method wrapper that delegates to a client's property.
"""
def wrapper(retry_client):
return getattr(retry_client.client, name)
return property(wrapper)
class RetryClient(object):
"""A ZookeeperClient wrapper that transparently performs retries.
A zookeeper connection can experience transient connection failures
on any operation. As long as the session associated to the connection
is still active on the zookeeper cluster, libzookeeper can reconnect
automatically to the cluster and session and the client is able to
retry.
Whether a given operation is safe for retry depends on the application
in question and how's interacting with zookeeper.
In particular coordination around sequence nodes can be
problematic, as the client has no way of knowing if the operation
succeed or not without additional application specific context.
Idempotent operations against the zookeeper tree are generally
safe to retry.
This class provides a simple wrapper around a zookeeper client,
that will automatically perform retries on operations that
interact with the zookeeper tree, in the face of transient errors,
till the session timeout has been reached. All of the attributes
and methods of a zookeeper client are exposed.
All the methods of the client that interact with the zookeeper tree
are retry enabled.
"""
def __init__(self, client):
self.client = client
self.client.cb_retry_error = None
def set_retry_error_callback(self, callback):
self.client.cb_retry_error = callback
def add_auth(self, *args, **kw):
return retry(self.client, self.client.add_auth, *args, **kw)
def create(self, *args, **kw):
return retry(self.client, self.client.create, *args, **kw)
def delete(self, *args, **kw):
return retry(self.client, self.client.delete, *args, **kw)
def exists(self, *args, **kw):
return retry(self.client, self.client.exists, *args, **kw)
def get(self, *args, **kw):
return retry(self.client, self.client.get, *args, **kw)
def get_acl(self, *args, **kw):
return retry(self.client, self.client.get_acl, *args, **kw)
def get_children(self, *args, **kw):
return retry(self.client, self.client.get_children, *args, **kw)
def set_acl(self, *args, **kw):
return retry(self.client, self.client.set_acl, *args, **kw)
def set(self, *args, **kw):
return retry(self.client, self.client.set, *args, **kw)
def sync(self, *args, **kw):
return retry(self.client, self.client.sync, *args, **kw)
# Watch retries
def exists_and_watch(self, *args, **kw):
return retry_watch(
self.client, self.client.exists_and_watch, *args, **kw)
def get_and_watch(self, *args, **kw):
return retry_watch(
self.client, self.client.get_and_watch, *args, **kw)
def get_children_and_watch(self, *args, **kw):
return retry_watch(
self.client, self.client.get_children_and_watch, *args, **kw)
# Passthrough methods
def set_connection_watcher(self, *args, **kw):
return self.client.set_connection_watcher(*args, **kw)
def set_connection_error_callback(self, *args, **kw):
return self.client.set_connection_error_callback(*args, **kw)
def set_session_callback(self, *args, **kw):
return self.client.set_session_callback(*args, **kw)
def set_determinstic_order(self, *args, **kw):
return self.client.set_determinstic_order(*args, **kw)
def close(self):
return self.client.close()
@inlineCallbacks
def connect(self, *args, **kw):
yield self.client.connect(*args, **kw)
returnValue(self)
# passthrough properties
state = _passproperty("state")
client_id = _passproperty("client_id")
session_timeout = _passproperty("session_timeout")
servers = _passproperty("servers")
handle = _passproperty("handle")
connected = _passproperty("connected")
unrecoverable = _passproperty("unrecoverable")
|