/usr/lib/python2.7/dist-packages/txamqp/protocol.py is in python-txamqp 0.6.1-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 | # coding: utf-8
from twisted.internet import defer, protocol
from twisted.internet.task import LoopingCall
from twisted.protocols import basic
from txamqp import spec
from txamqp.codec import Codec
from txamqp.connection import Header, Frame, Method, Body, Heartbeat
from txamqp.message import Message
from txamqp.content import Content
from txamqp.queue import TimeoutDeferredQueue, Closed as QueueClosed
from txamqp.client import TwistedEvent, Closed
from cStringIO import StringIO
import struct
from time import time
class GarbageException(Exception):
pass
# An AMQP channel is a virtual connection that shares the
# same socket with others channels. One can have many channels
# per connection
class AMQChannel(object):
def __init__(self, id, outgoing):
self.id = id
self.outgoing = outgoing
self.incoming = TimeoutDeferredQueue()
self.responses = TimeoutDeferredQueue()
self.queue = None
self.closed = False
self.reason = None
def close(self, reason):
if self.closed:
return
self.closed = True
self.reason = reason
self.incoming.close()
self.responses.close()
def dispatch(self, frame, work):
payload = frame.payload
if isinstance(payload, Method):
if payload.method.response:
self.queue = self.responses
else:
self.queue = self.incoming
work.put(self.incoming)
self.queue.put(frame)
@defer.inlineCallbacks
def invoke(self, method, args, content=None):
if self.closed:
raise Closed(self.reason)
frame = Frame(self.id, Method(method, *args))
self.outgoing.put(frame)
if method.content:
if content == None:
content = Content()
self.writeContent(method.klass, content, self.outgoing)
try:
# here we depend on all nowait fields being named nowait
f = method.fields.byname["nowait"]
nowait = args[method.fields.index(f)]
except KeyError:
nowait = False
try:
if not nowait and method.responses:
resp = (yield self.responses.get()).payload
if resp.method.content:
content = yield readContent(self.responses)
else:
content = None
if resp.method in method.responses:
defer.returnValue(Message(resp.method, resp.args, content))
else:
raise ValueError(resp)
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
else:
raise e
def writeContent(self, klass, content, queue):
size = content.size()
header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
queue.put(header)
for child in content.children:
self.writeContent(klass, child, queue)
# should split up if content.body exceeds max frame size
if size > 0:
queue.put(Frame(self.id, Body(content.body)))
class FrameReceiver(protocol.Protocol, basic._PauseableMixin):
frame_mode = False
MAX_LENGTH = 4096
HEADER_LENGTH = 1 + 2 + 4 + 1
__buffer = ''
def __init__(self, spec):
self.spec = spec
self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
# packs a frame and writes it to the underlying transport
def sendFrame(self, frame):
data = self._packFrame(frame)
self.transport.write(data)
# packs a frame, see qpid.connection.Connection#write
def _packFrame(self, frame):
s = StringIO()
c = Codec(s)
c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
c.encode_short(frame.channel)
frame.payload.encode(c)
c.encode_octet(self.FRAME_END)
data = s.getvalue()
return data
# unpacks a frame, see qpid.connection.Connection#read
def _unpackFrame(self, data):
s = StringIO(data)
c = Codec(s)
frameType = spec.pythonize(self.spec.constants.byid[c.decode_octet()].name)
channel = c.decode_short()
payload = Frame.DECODERS[frameType].decode(self.spec, c)
end = c.decode_octet()
if end != self.FRAME_END:
raise GarbageException('frame error: expected %r, got %r' % (self.FRAME_END, end))
frame = Frame(channel, payload)
return frame
def setRawMode(self):
self.frame_mode = False
def setFrameMode(self, extra=''):
self.frame_mode = True
if extra:
return self.dataReceived(extra)
def dataReceived(self, data):
self.__buffer = self.__buffer + data
while self.frame_mode and not self.paused:
sz = len(self.__buffer) - self.HEADER_LENGTH
if sz >= 0:
length, = struct.unpack("!I", self.__buffer[3:7]) # size = 4 bytes
if sz >= length:
packet = self.__buffer[:self.HEADER_LENGTH + length]
self.__buffer = self.__buffer[self.HEADER_LENGTH + length:]
frame = self._unpackFrame(packet)
why = self.frameReceived(frame)
if why or self.transport and self.transport.disconnecting:
return why
else:
continue
if len(self.__buffer) > self.MAX_LENGTH:
frame, self.__buffer = self.__buffer, ''
return self.frameLengthExceeded(frame)
break
else:
if not self.paused:
data = self.__buffer
self.__buffer = ''
if data:
return self.rawDataReceived(data)
def sendInitString(self):
initString = "!4s4B"
s = StringIO()
c = Codec(s)
c.pack(initString, "AMQP", 1, 1, self.spec.major, self.spec.minor)
self.transport.write(s.getvalue())
@defer.inlineCallbacks
def readContent(queue):
frame = yield queue.get()
header = frame.payload
children = []
for i in range(header.weight):
content = yield readContent(queue)
children.append(content)
size = header.size
read = 0
buf = StringIO()
while read < size:
body = yield queue.get()
content = body.payload.content
buf.write(content)
read += len(content)
defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
class AMQClient(FrameReceiver):
channelClass = AMQChannel
# Max unreceived heartbeat frames. The AMQP standard says it's 3.
MAX_UNSEEN_HEARTBEAT = 3
def __init__(self, delegate, vhost, spec, heartbeat=0, clock=None, insist=False):
FrameReceiver.__init__(self, spec)
self.delegate = delegate
# XXX Cyclic dependency
self.delegate.client = self
self.vhost = vhost
self.channelFactory = type("Channel%s" % self.spec.klass.__name__,
(self.channelClass, self.spec.klass), {})
self.channels = {}
self.channelLock = defer.DeferredLock()
self.outgoing = defer.DeferredQueue()
self.work = defer.DeferredQueue()
self.started = TwistedEvent()
self.queueLock = defer.DeferredLock()
self.basic_return_queue = TimeoutDeferredQueue()
self.queues = {}
self.outgoing.get().addCallback(self.writer)
self.work.get().addCallback(self.worker)
self.heartbeatInterval = heartbeat
self.insist = insist
if self.heartbeatInterval > 0:
if clock is None:
from twisted.internet import reactor as clock
self.clock = clock
self.checkHB = self.clock.callLater(self.heartbeatInterval *
self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
self.sendHB = LoopingCall(self.sendHeartbeat)
d = self.started.wait()
d.addCallback(lambda _: self.reschedule_sendHB())
d.addCallback(lambda _: self.reschedule_checkHB())
def reschedule_sendHB(self):
if self.heartbeatInterval > 0:
if self.sendHB.running:
self.sendHB.stop()
self.sendHB.start(self.heartbeatInterval, now=False)
def reschedule_checkHB(self):
if self.checkHB.active():
self.checkHB.cancel()
self.checkHB = self.clock.callLater(self.heartbeatInterval *
self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
def check_0_8(self):
return (self.spec.minor, self.spec.major) == (0, 8)
@defer.inlineCallbacks
def channel(self, id):
yield self.channelLock.acquire()
try:
try:
ch = self.channels[id]
except KeyError:
ch = self.channelFactory(id, self.outgoing)
self.channels[id] = ch
finally:
self.channelLock.release()
defer.returnValue(ch)
@defer.inlineCallbacks
def queue(self, key):
yield self.queueLock.acquire()
try:
try:
q = self.queues[key]
except KeyError:
q = TimeoutDeferredQueue()
self.queues[key] = q
finally:
self.queueLock.release()
defer.returnValue(q)
def close(self, reason):
for ch in self.channels.values():
ch.close(reason)
for q in self.queues.values():
q.close()
self.delegate.close(reason)
def writer(self, frame):
self.sendFrame(frame)
self.outgoing.get().addCallback(self.writer)
def worker(self, queue):
d = self.dispatch(queue)
def cb(ign):
self.work.get().addCallback(self.worker)
d.addCallback(cb)
d.addErrback(self.close)
@defer.inlineCallbacks
def dispatch(self, queue):
frame = yield queue.get()
channel = yield self.channel(frame.channel)
payload = frame.payload
if payload.method.content:
content = yield readContent(queue)
else:
content = None
# Let the caller deal with exceptions thrown here.
message = Message(payload.method, payload.args, content)
self.delegate.dispatch(channel, message)
# As soon as we connect to the target AMQP broker, send the init string
def connectionMade(self):
self.sendInitString()
self.setFrameMode()
def frameReceived(self, frame):
self.processFrame(frame)
def sendFrame(self, frame):
if frame.payload.type != Frame.HEARTBEAT:
self.reschedule_sendHB()
FrameReceiver.sendFrame(self, frame)
@defer.inlineCallbacks
def processFrame(self, frame):
ch = yield self.channel(frame.channel)
if frame.payload.type == Frame.HEARTBEAT:
self.lastHBReceived = time()
else:
ch.dispatch(frame, self.work)
if self.heartbeatInterval > 0:
self.reschedule_checkHB()
@defer.inlineCallbacks
def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
if self.check_0_8():
response = {"LOGIN": username, "PASSWORD": password}
else:
response = "\0" + username + "\0" + password
yield self.start(response, mechanism, locale)
@defer.inlineCallbacks
def start(self, response, mechanism='AMQPLAIN', locale='en_US'):
self.response = response
self.mechanism = mechanism
self.locale = locale
yield self.started.wait()
channel0 = yield self.channel(0)
if self.check_0_8():
result = yield channel0.connection_open(self.vhost, insist=self.insist)
else:
result = yield channel0.connection_open(self.vhost)
defer.returnValue(result)
def sendHeartbeat(self):
self.sendFrame(Frame(0, Heartbeat()))
self.lastHBSent = time()
def checkHeartbeat(self):
if self.checkHB.active():
self.checkHB.cancel()
self.transport.loseConnection()
def connectionLost(self, reason):
if self.heartbeatInterval > 0:
if self.sendHB.running:
self.sendHB.stop()
if self.checkHB.active():
self.checkHB.cancel()
self.close(reason)
|