/usr/lib/python2.7/dist-packages/etcd/lock.py is in python-etcd 0.4.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | import logging
import etcd
import uuid
_log = logging.getLogger(__name__)
class Lock(object):
"""
Locking recipe for etcd, inspired by the kazoo recipe for zookeeper
"""
def __init__(self, client, lock_name):
self.client = client
self.name = lock_name
# props to Netflix Curator for this trick. It is possible for our
# create request to succeed on the server, but for a failure to
# prevent us from getting back the full path name. We prefix our
# lock name with a uuid and can check for its presence on retry.
self._uuid = uuid.uuid4().hex
self.path = "/_locks/{}".format(lock_name)
self.is_taken = False
self._sequence = None
_log.debug("Initiating lock for %s with uuid %s", self.path, self._uuid)
@property
def uuid(self):
"""
The unique id of the lock
"""
return self._uuid
@uuid.setter
def set_uuid(self, value):
old_uuid = self._uuid
self._uuid = value
if not self._find_lock():
_log.warn("The hand-set uuid was not found, refusing")
self._uuid = old_uuid
raise ValueError("Inexistent UUID")
@property
def is_acquired(self):
"""
tells us if the lock is acquired
"""
if not self.is_taken:
_log.debug("Lock not taken")
return False
try:
self.client.read(self.lock_key)
return True
except etcd.EtcdKeyNotFound:
_log.warn("Lock was supposedly taken, but we cannot find it")
self.is_taken = False
return False
def acquire(self, blocking=True, lock_ttl=3600, timeout=None):
"""
Acquire the lock.
:param blocking Block until the lock is obtained, or timeout is reached
:param lock_ttl The duration of the lock we acquired, set to None for eternal locks
:param timeout The time to wait before giving up on getting a lock
"""
# First of all try to write, if our lock is not present.
if not self._find_lock():
_log.debug("Lock not found, writing it to %s", self.path)
res = self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
self._set_sequence(res.key)
_log.debug("Lock key %s written, sequence is %s", res.key, self._sequence)
elif lock_ttl:
# Renew our lock if already here!
self.client.write(self.lock_key, self.uuid, ttl=lock_ttl)
# now get the owner of the lock, and the next lowest sequence
return self._acquired(blocking=blocking, timeout=timeout)
def release(self):
"""
Release the lock
"""
if not self._sequence:
self._find_lock()
try:
_log.debug("Releasing existing lock %s", self.lock_key)
self.client.delete(self.lock_key)
except etcd.EtcdKeyNotFound:
_log.info("Lock %s not found, nothing to release", self.lock_key)
pass
finally:
self.is_taken = False
def __enter__(self):
"""
You can use the lock as a contextmanager
"""
self.acquire(blocking=True, lock_ttl=0)
def __exit__(self, type, value, traceback):
self.release()
def _acquired(self, blocking=True, timeout=0):
locker, nearest = self._get_locker()
self.is_taken = False
if self.lock_key == locker:
_log.debug("Lock acquired!")
# We own the lock, yay!
self.is_taken = True
return True
else:
self.is_taken = False
if not blocking:
return False
# Let's look for the lock
watch_key = nearest
_log.debug("Lock not acquired, now watching %s", watch_key)
t = max(0, timeout)
while True:
try:
r = self.client.watch(watch_key, timeout=t)
_log.debug("Detected variation for %s: %s", r.key, r.action)
return self._acquired(blocking=True, timeout=timeout)
except etcd.EtcdKeyNotFound:
_log.debug("Key %s not present anymore, moving on", watch_key)
return self._acquired(blocking=True, timeout=timeout)
except etcd.EtcdException:
# TODO: log something...
pass
@property
def lock_key(self):
if not self._sequence:
raise ValueError("No sequence present.")
return self.path + '/' + str(self._sequence)
def _set_sequence(self, key):
self._sequence = key.replace(self.path, '').lstrip('/')
def _find_lock(self):
if self._sequence:
try:
res = self.client.read(self.lock_key)
self._uuid = res.value
return True
except etcd.EtcdKeyNotFound:
return False
elif self._uuid:
try:
for r in self.client.read(self.path, recursive=True).leaves:
if r.value == self._uuid:
self._set_sequence(r.key)
return True
except etcd.EtcdKeyNotFound:
pass
return False
def _get_locker(self):
results = [res for res in
self.client.read(self.path, recursive=True).leaves]
if not self._sequence:
self._find_lock()
l = sorted([r.key for r in results])
_log.debug("Lock keys found: %s", l)
try:
i = l.index(self.lock_key)
if i == 0:
_log.debug("No key before our one, we are the locker")
return (l[0], None)
else:
_log.debug("Locker: %s, key to watch: %s", l[0], l[i-1])
return (l[0], l[i-1])
except ValueError:
# Something very wrong is going on, most probably
# our lock has expired
raise etcd.EtcdLockExpired(u"Lock not found")
|