This file is indexed.

/usr/share/pyshared/txzookeeper/retry.py is in python-txzookeeper 0.9.8-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
#
#  Copyright (C) 2011 Canonical Ltd. All Rights Reserved
#
#  This file is part of txzookeeper.
#
#  Authors:
#   Kapil Thangavelu
#
#  txzookeeper is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  txzookeeper is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with txzookeeper.  If not, see <http://www.gnu.org/licenses/>.
#

"""
A retry client facade that transparently handles transient connection
errors.
"""

import time
import logging
import zookeeper

from twisted.internet.defer import inlineCallbacks, returnValue, Deferred

from txzookeeper.client import NotConnectedException
from txzookeeper.utils import sleep

__all__ = ["retry", "RetryClient"]

log = logging.getLogger("txzk.retry")


class RetryException(Exception):
    """Explicit retry exception.
    """

# Session timeout percentage that we should wait till retrying.
RETRY_FRACTION = 30


def is_retryable(e):
    """Determine if an exception signifies a recoverable connection error.
    """
    return isinstance(
        e,
        (zookeeper.ClosingException,
         zookeeper.ConnectionLossException,
         zookeeper.OperationTimeoutException,
         RetryException))


def is_session_error(e):
    return isinstance(
        e,
        (zookeeper.SessionExpiredException,
         zookeeper.ConnectionLossException,
         zookeeper.ClosingException,
         NotConnectedException))


def _args(args):
    return args and args[0] or "NA"


def get_delay(session_timeout, max_delay=5, session_fraction=RETRY_FRACTION):
    """Get retry delay between retrying an operation.

    Returns either the specified fraction of a session timeout or the
    max delay, whichever is smaller.

    The goal is to allow the connection time to auto-heal, before
    retrying an operation.

    :param session_timeout: The timeout for the session, in milliseconds
    :param max_delay: The max delay for a retry, in seconds.
    :param session_fraction: The fractional amount of a timeout to wait
    """
    retry_delay = (session_timeout * (float(session_fraction) / 100)) / 1000
    return min(retry_delay, max_delay)


def check_error(e):
    """Verify a zookeeper connection error, as opposed to an app error.
    """
    return is_retryable(e) or is_session_error(e)


def check_retryable(retry_client, max_time, error):
    """Check an error and a client to see if an operation is retryable.

    :param retry_client: A txzookeeper client
    :param max_time: The max time (epoch tick) that the op is retryable till.
    :param error: The client operation exception.
    """

    t = time.time()

    # Only if the error is known.
    if not is_retryable(error):
        return False

    # Only if we've haven't exceeded the max allotted time.
    if max_time <= t:
        return False

    # Only if the client hasn't been explicitly closed.
    if not retry_client.connected:
        return False

    # Only if the client is in a recoverable state.
    if retry_client.unrecoverable:
        return False

    return True


@inlineCallbacks
def retry(client, func, *args, **kw):
    """Constructs a retry wrapper around a function that retries invocations.

    If the function execution results in an exception due to a transient
    connection error, the retry wrapper will reinvoke the operation after
    a suitable delay (fractional value of the session timeout).

    :param client: A ZookeeperClient instance.
    :param func: A callable python object that interacts with
           zookeeper, the callable must utilize the same zookeeper
           connection as passed in the `client` param. The function
           must return a single value (either a deferred or result
           value).
    """
    retry_started = [time.time()]
    retry_error = False
    while 1:
        try:
            value = yield func(*args, **kw)
        except Exception, e:
            # For clients which aren't connected (session timeout == None)
            # we raise the errors to the callers.
            session_timeout = client.session_timeout or 0

            # The longest we keep retrying is 1.2 * session timeout
            max_time = (session_timeout / 1000.0) * 1.2 + retry_started[0]

            if not check_retryable(client, max_time, e):
                # Check if its a persistent client error, and if so use the cb
                # if present to try and reconnect for client errors.
                if (check_error(e)
                        and time.time() > max_time
                        and callable(client.cb_retry_error)
                        and not retry_error):
                    log.debug("Retry error %r on %s @ %s",
                              e, func.__name__, _args(args))
                    retry_error = True
                    yield client.cb_retry_error(e)
                    retry_started[0] = time.time()
                    continue
                raise

            # Give the connection a chance to auto-heal.
            yield sleep(get_delay(session_timeout))
            log.debug("Retry on %s @ %s", func.__name__, _args(args))
            continue

        returnValue(value)


def retry_watch(client, func, *args, **kw):
    """Contructs a wrapper around a watch callable that retries invocations.

    If the callable execution results in an exception due to a transient
    connection error, the retry wrapper will reinvoke the operation after
    a suitable delay (fractional value of the session timeout).

    A watch function must return back a tuple of deferreds
    (value_deferred, watch_deferred). No inline callbacks are
    performed in here to ensure that callers continue to see a
    tuple of results.

    The client passed to this retry function must be the same as
    the one utilized by the python callable.

    :param client: A ZookeeperClient instance.
    :param func: A python callable that interacts with zookeeper. If a
           function is passed, a txzookeeper client must the first
           parameter of this function. The function must return a
           tuple of (value_deferred, watch_deferred)
    """
    # For clients which aren't connected (session timeout == None)
    # we raise the usage errors to the callers
    session_timeout = client.session_timeout or 0

    # If we keep retrying past the 1.2 * session timeout without
    # success just die, the session expiry is fatal.
    max_time = session_timeout * 1.2 + time.time()
    value_d, watch_d = func(*args, **kw)

    def retry_delay(f):
        """Errback, verifes an op is retryable, and delays the next retry.
        """
        # Check that operation is retryable.
        if not check_retryable(client, max_time, f.value):
            return f

        # Give the connection a chance to auto-heal
        d = sleep(get_delay(session_timeout))
        d.addCallback(retry_inner)

        return d

    def retry_inner(value):
        """Retry operation invoker.
        """
        # Invoke the function
        retry_value_d, retry_watch_d = func(*args, **kw)

        # If we need to retry again.
        retry_value_d.addErrback(retry_delay)

        # Chain the new watch deferred to the old, presuming its doa
        # if the value deferred errored on a connection error.
        retry_watch_d.chainDeferred(watch_d)

        # Insert back into the callback chain.
        return retry_value_d

    # Attach the retry
    value_d.addErrback(retry_delay)

    return value_d, watch_d


def _passproperty(name):
    """Returns a method wrapper that delegates to a client's property.
    """
    def wrapper(retry_client):
        return getattr(retry_client.client, name)
    return property(wrapper)


class RetryClient(object):
    """A ZookeeperClient wrapper that transparently performs retries.

    A zookeeper connection can experience transient connection failures
    on any operation. As long as the session associated to the connection
    is still active on the zookeeper cluster, libzookeeper can reconnect
    automatically to the cluster and session and the client is able to
    retry.

    Whether a given operation is safe for retry depends on the application
    in question and how's interacting with zookeeper.

    In particular coordination around sequence nodes can be
    problematic, as the client has no way of knowing if the operation
    succeed or not without additional application specific context.

    Idempotent operations against the zookeeper tree are generally
    safe to retry.

    This class provides a simple wrapper around a zookeeper client,
    that will automatically perform retries on operations that
    interact with the zookeeper tree, in the face of transient errors,
    till the session timeout has been reached. All of the attributes
    and methods of a zookeeper client are exposed.

    All the methods of the client that interact with the zookeeper tree
    are retry enabled.
    """

    def __init__(self, client):
        self.client = client
        self.client.cb_retry_error = None

    def set_retry_error_callback(self, callback):
        self.client.cb_retry_error = callback

    def add_auth(self, *args, **kw):
        return retry(self.client, self.client.add_auth, *args, **kw)

    def create(self, *args, **kw):
        return retry(self.client, self.client.create, *args, **kw)

    def delete(self, *args, **kw):
        return retry(self.client, self.client.delete, *args, **kw)

    def exists(self, *args, **kw):
        return retry(self.client, self.client.exists, *args, **kw)

    def get(self, *args, **kw):
        return retry(self.client, self.client.get, *args, **kw)

    def get_acl(self, *args, **kw):
        return retry(self.client, self.client.get_acl, *args, **kw)

    def get_children(self, *args, **kw):
        return retry(self.client, self.client.get_children, *args, **kw)

    def set_acl(self, *args, **kw):
        return retry(self.client, self.client.set_acl, *args, **kw)

    def set(self, *args, **kw):
        return retry(self.client, self.client.set, *args, **kw)

    def sync(self, *args, **kw):
        return retry(self.client, self.client.sync, *args, **kw)

    # Watch retries

    def exists_and_watch(self, *args, **kw):
        return retry_watch(
            self.client, self.client.exists_and_watch, *args, **kw)

    def get_and_watch(self, *args, **kw):
        return retry_watch(
            self.client, self.client.get_and_watch, *args, **kw)

    def get_children_and_watch(self, *args, **kw):
        return retry_watch(
            self.client, self.client.get_children_and_watch, *args, **kw)

    # Passthrough methods

    def set_connection_watcher(self, *args, **kw):
        return self.client.set_connection_watcher(*args, **kw)

    def set_connection_error_callback(self, *args, **kw):
        return self.client.set_connection_error_callback(*args, **kw)

    def set_session_callback(self, *args, **kw):
        return self.client.set_session_callback(*args, **kw)

    def set_determinstic_order(self, *args, **kw):
        return self.client.set_determinstic_order(*args, **kw)

    def close(self):
        return self.client.close()

    @inlineCallbacks
    def connect(self, *args, **kw):
        yield self.client.connect(*args, **kw)
        returnValue(self)

    # passthrough properties
    state = _passproperty("state")
    client_id = _passproperty("client_id")
    session_timeout = _passproperty("session_timeout")
    servers = _passproperty("servers")
    handle = _passproperty("handle")
    connected = _passproperty("connected")
    unrecoverable = _passproperty("unrecoverable")