/usr/lib/python2.7/dist-packages/txamqp/queue.py is in python-txamqp 0.6.1-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | # coding: utf-8
from twisted.internet.defer import DeferredQueue
class Empty(Exception):
pass
class Closed(Exception):
pass
class TimeoutDeferredQueue(DeferredQueue):
END = object()
def __init__(self, clock=None):
if clock is None:
from twisted.internet import reactor as clock
self.clock = clock
DeferredQueue.__init__(self)
def _timeout(self, deferred):
if not deferred.called:
if deferred in self.waiting:
self.waiting.remove(deferred)
deferred.errback(Empty())
def _raiseIfClosed(self, result, call_id):
if call_id is not None:
call_id.cancel()
if result == TimeoutDeferredQueue.END:
self.put(TimeoutDeferredQueue.END)
raise Closed()
else:
return result
def get(self, timeout=None):
deferred = DeferredQueue.get(self)
call_id = None
if timeout:
call_id = self.clock.callLater(timeout, self._timeout, deferred)
deferred.addCallback(self._raiseIfClosed, call_id)
return deferred
def close(self):
self.put(TimeoutDeferredQueue.END)
|