/usr/share/pyshared/eventlet/semaphore.py is in python-eventlet 0.9.16-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | from eventlet import greenthread
from eventlet import hubs
class Semaphore(object):
"""An unbounded semaphore.
Optionally initialize with a resource *count*, then :meth:`acquire` and
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
*count* is zero suspends the calling greenthread until *count* becomes
nonzero again.
This is API-compatible with :class:`threading.Semaphore`.
It is a context manager, and thus can be used in a with block::
sem = Semaphore(2)
with sem:
do_some_stuff()
If not specified, *value* defaults to 1.
"""
def __init__(self, value=1):
self.counter = value
if value < 0:
raise ValueError("Semaphore must be initialized with a positive "
"number, got %s" % value)
self._waiters = set()
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)),
self.counter, len(self._waiters))
return '<%s at %s c=%s _w[%s]>' % params
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._waiters))
return '<%s c=%s _w[%s]>' % params
def locked(self):
"""Returns true if a call to acquire would block."""
return self.counter <= 0
def bounded(self):
"""Returns False; for consistency with
:class:`~eventlet.semaphore.CappedSemaphore`."""
return False
def acquire(self, blocking=True):
"""Acquire a semaphore.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true."""
if not blocking and self.locked():
return False
if self.counter <= 0:
self._waiters.add(greenthread.getcurrent())
try:
while self.counter <= 0:
hubs.get_hub().switch()
finally:
self._waiters.discard(greenthread.getcurrent())
self.counter -= 1
return True
def __enter__(self):
self.acquire()
def release(self, blocking=True):
"""Release a semaphore, incrementing the internal counter by one. When
it was zero on entry and another thread is waiting for it to become
larger than zero again, wake up that thread.
The *blocking* argument is for consistency with CappedSemaphore and is
ignored"""
self.counter += 1
if self._waiters:
hubs.get_hub().schedule_call_global(0, self._do_acquire)
return True
def _do_acquire(self):
if self._waiters and self.counter>0:
waiter = self._waiters.pop()
waiter.switch()
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
"""An integer value that represents how many new calls to
:meth:`acquire` or :meth:`release` would be needed to get the counter to
0. If it is positive, then its value is the number of acquires that can
happen before the next acquire would block. If it is negative, it is
the negative of the number of releases that would be required in order
to make the counter 0 again (one more release would push the counter to
1 and unblock acquirers). It takes into account how many greenthreads
are currently blocking in :meth:`acquire`.
"""
# positive means there are free items
# zero means there are no free items but nobody has requested one
# negative means there are requests for items, but no items
return self.counter - len(self._waiters)
class BoundedSemaphore(Semaphore):
"""A bounded semaphore checks to make sure its current value doesn't exceed
its initial value. If it does, ValueError is raised. In most situations
semaphores are used to guard resources with limited capacity. If the
semaphore is released too many times it's a sign of a bug. If not given,
*value* defaults to 1."""
def __init__(self, value=1):
super(BoundedSemaphore, self).__init__(value)
self.original_counter = value
def release(self, blocking=True):
"""Release a semaphore, incrementing the internal counter by one. If
the counter would exceed the initial value, raises ValueError. When
it was zero on entry and another thread is waiting for it to become
larger than zero again, wake up that thread.
The *blocking* argument is for consistency with :class:`CappedSemaphore`
and is ignored"""
if self.counter >= self.original_counter:
raise ValueError, "Semaphore released too many times"
return super(BoundedSemaphore, self).release(blocking)
class CappedSemaphore(object):
"""A blockingly bounded semaphore.
Optionally initialize with a resource *count*, then :meth:`acquire` and
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
*count* is zero suspends the calling greenthread until count becomes nonzero
again. Attempting to :meth:`release` after *count* has reached *limit*
suspends the calling greenthread until *count* becomes less than *limit*
again.
This has the same API as :class:`threading.Semaphore`, though its
semantics and behavior differ subtly due to the upper limit on calls
to :meth:`release`. It is **not** compatible with
:class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
instead of raising a ValueError.
It is a context manager, and thus can be used in a with block::
sem = CappedSemaphore(2)
with sem:
do_some_stuff()
"""
def __init__(self, count, limit):
if count < 0:
raise ValueError("CappedSemaphore must be initialized with a "
"positive number, got %s" % count)
if count > limit:
# accidentally, this also catches the case when limit is None
raise ValueError("'count' cannot be more than 'limit'")
self.lower_bound = Semaphore(count)
self.upper_bound = Semaphore(limit-count)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)),
self.balance, self.lower_bound, self.upper_bound)
return '<%s at %s b=%s l=%s u=%s>' % params
def __str__(self):
params = (self.__class__.__name__, self.balance,
self.lower_bound, self.upper_bound)
return '<%s b=%s l=%s u=%s>' % params
def locked(self):
"""Returns true if a call to acquire would block."""
return self.lower_bound.locked()
def bounded(self):
"""Returns true if a call to release would block."""
return self.upper_bound.locked()
def acquire(self, blocking=True):
"""Acquire a semaphore.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true."""
if not blocking and self.locked():
return False
self.upper_bound.release()
try:
return self.lower_bound.acquire()
except:
self.upper_bound.counter -= 1
# using counter directly means that it can be less than zero.
# however I certainly don't need to wait here and I don't seem to have
# a need to care about such inconsistency
raise
def __enter__(self):
self.acquire()
def release(self, blocking=True):
"""Release a semaphore. In this class, this behaves very much like
an :meth:`acquire` but in the opposite direction.
Imagine the docs of :meth:`acquire` here, but with every direction
reversed. When calling this method, it will block if the internal
counter is greater than or equal to *limit*."""
if not blocking and self.bounded():
return False
self.lower_bound.release()
try:
return self.upper_bound.acquire()
except:
self.lower_bound.counter -= 1
raise
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
"""An integer value that represents how many new calls to
:meth:`acquire` or :meth:`release` would be needed to get the counter to
0. If it is positive, then its value is the number of acquires that can
happen before the next acquire would block. If it is negative, it is
the negative of the number of releases that would be required in order
to make the counter 0 again (one more release would push the counter to
1 and unblock acquirers). It takes into account how many greenthreads
are currently blocking in :meth:`acquire` and :meth:`release`."""
return self.lower_bound.balance - self.upper_bound.balance
|