This file is indexed.

/usr/lib/python2.7/dist-packages/proton/handlers.py is in python-qpid-proton 0.10-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
import heapq, logging, os, re, socket, time, types

from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
from select import select


class OutgoingMessageHandler(Handler):
    """
    A utility for simpler and more intuitive handling of delivery
    events related to outgoing i.e. sent messages.
    """
    def __init__(self, auto_settle=True, delegate=None):
        self.auto_settle = auto_settle
        self.delegate = delegate

    def on_link_flow(self, event):
        if event.link.is_sender and event.link.credit:
            self.on_sendable(event)

    def on_delivery(self, event):
        dlv = event.delivery
        if dlv.link.is_sender and dlv.updated:
            if dlv.remote_state == Delivery.ACCEPTED:
                self.on_accepted(event)
            elif dlv.remote_state == Delivery.REJECTED:
                self.on_rejected(event)
            elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
                self.on_released(event)
            if dlv.settled:
                self.on_settled(event)
            if self.auto_settle:
                dlv.settle()

    def on_sendable(self, event):
        """
        Called when the sender link has credit and messages can
        therefore be transferred.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_sendable', event)

    def on_accepted(self, event):
        """
        Called when the remote peer accepts an outgoing message.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_accepted', event)

    def on_rejected(self, event):
        """
        Called when the remote peer rejects an outgoing message.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_rejected', event)

    def on_released(self, event):
        """
        Called when the remote peer releases an outgoing message. Note
        that this may be in response to either the RELEASE or MODIFIED
        state as defined by the AMQP specification.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_released', event)

    def on_settled(self, event):
        """
        Called when the remote peer has settled the outgoing
        message. This is the point at which it shouod never be
        retransmitted.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_settled', event)

def recv_msg(delivery):
    msg = Message()
    msg.decode(delivery.link.recv(delivery.pending))
    delivery.link.advance()
    return msg

class Reject(ProtonException):
  """
  An exception that indicate a message should be rejected
  """
  pass

class Release(ProtonException):
  """
  An exception that indicate a message should be rejected
  """
  pass

class Acking(object):
    def accept(self, delivery):
        """
        Accepts a received message.
        """
        self.settle(delivery, Delivery.ACCEPTED)

    def reject(self, delivery):
        """
        Rejects a received message that is considered invalid or
        unprocessable.
        """
        self.settle(delivery, Delivery.REJECTED)

    def release(self, delivery, delivered=True):
        """
        Releases a received message, making it available at the source
        for any (other) interested receiver. The ``delivered``
        parameter indicates whether this should be considered a
        delivery attempt (and the delivery count updated) or not.
        """
        if delivered:
            self.settle(delivery, Delivery.MODIFIED)
        else:
            self.settle(delivery, Delivery.RELEASED)

    def settle(self, delivery, state=None):
        if state:
            delivery.update(state)
        delivery.settle()

class IncomingMessageHandler(Handler, Acking):
    """
    A utility for simpler and more intuitive handling of delivery
    events related to incoming i.e. received messages.
    """

    def __init__(self, auto_accept=True, delegate=None):
        self.delegate = delegate
        self.auto_accept = auto_accept

    def on_delivery(self, event):
        dlv = event.delivery
        if not dlv.link.is_receiver: return
        if dlv.readable and not dlv.partial:
            event.message = recv_msg(dlv)
            if event.link.state & Endpoint.LOCAL_CLOSED:
                if self.auto_accept:
                    dlv.update(Delivery.RELEASED)
                    dlv.settle()
            else:
                try:
                    self.on_message(event)
                    if self.auto_accept:
                        dlv.update(Delivery.ACCEPTED)
                        dlv.settle()
                except Reject:
                    dlv.update(Delivery.REJECTED)
                    dlv.settle()
                except Release:
                    dlv.update(Delivery.MODIFIED)
                    dlv.settle()
        elif dlv.updated and dlv.settled:
            self.on_settled(event)

    def on_message(self, event):
        """
        Called when a message is received. The message itself can be
        obtained as a property on the event. For the purpose of
        refering to this message in further actions (e.g. if
        explicitly accepting it, the ``delivery`` should be used, also
        obtainable via a property on the event.
        """
        if self.delegate:
            dispatch(self.delegate, 'on_message', event)

    def on_settled(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_settled', event)

class EndpointStateHandler(Handler):
    """
    A utility that exposes 'endpoint' events i.e. the open/close for
    links, sessions and connections in a more intuitive manner. A
    XXX_opened method will be called when both local and remote peers
    have opened the link, session or connection. This can be used to
    confirm a locally initiated action for example. A XXX_opening
    method will be called when the remote peer has requested an open
    that was not initiated locally. By default this will simply open
    locally, which then triggers the XXX_opened call. The same applies
    to close.
    """

    def __init__(self, peer_close_is_error=False, delegate=None):
        self.delegate = delegate
        self.peer_close_is_error = peer_close_is_error

    @classmethod
    def is_local_open(cls, endpoint):
        return endpoint.state & Endpoint.LOCAL_ACTIVE

    @classmethod
    def is_local_uninitialised(cls, endpoint):
        return endpoint.state & Endpoint.LOCAL_UNINIT

    @classmethod
    def is_local_closed(cls, endpoint):
        return endpoint.state & Endpoint.LOCAL_CLOSED

    @classmethod
    def is_remote_open(cls, endpoint):
        return endpoint.state & Endpoint.REMOTE_ACTIVE

    @classmethod
    def is_remote_closed(cls, endpoint):
        return endpoint.state & Endpoint.REMOTE_CLOSED

    @classmethod
    def print_error(cls, endpoint, endpoint_type):
        if endpoint.remote_condition:
            logging.error(endpoint.remote_condition.description)
        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
            logging.error("%s closed by peer" % endpoint_type)

    def on_link_remote_close(self, event):
        if event.link.remote_condition:
            self.on_link_error(event)
        elif self.is_local_closed(event.link):
            self.on_link_closed(event)
        else:
            self.on_link_closing(event)
        event.link.close()

    def on_session_remote_close(self, event):
        if event.session.remote_condition:
            self.on_session_error(event)
        elif self.is_local_closed(event.session):
            self.on_session_closed(event)
        else:
            self.on_session_closing(event)
        event.session.close()

    def on_connection_remote_close(self, event):
        if event.connection.remote_condition:
            self.on_connection_error(event)
        elif self.is_local_closed(event.connection):
           self.on_connection_closed(event)
        else:
            self.on_connection_closing(event)
        event.connection.close()

    def on_connection_local_open(self, event):
        if self.is_remote_open(event.connection):
            self.on_connection_opened(event)

    def on_connection_remote_open(self, event):
        if self.is_local_open(event.connection):
            self.on_connection_opened(event)
        elif self.is_local_uninitialised(event.connection):
            self.on_connection_opening(event)
            event.connection.open()

    def on_session_local_open(self, event):
        if self.is_remote_open(event.session):
            self.on_session_opened(event)

    def on_session_remote_open(self, event):
        if self.is_local_open(event.session):
            self.on_session_opened(event)
        elif self.is_local_uninitialised(event.session):
            self.on_session_opening(event)
            event.session.open()

    def on_link_local_open(self, event):
        if self.is_remote_open(event.link):
            self.on_link_opened(event)

    def on_link_remote_open(self, event):
        if self.is_local_open(event.link):
            self.on_link_opened(event)
        elif self.is_local_uninitialised(event.link):
            self.on_link_opening(event)
            event.link.open()

    def on_connection_opened(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_connection_opened', event)

    def on_session_opened(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_session_opened', event)

    def on_link_opened(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_link_opened', event)

    def on_connection_opening(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_connection_opening', event)

    def on_session_opening(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_session_opening', event)

    def on_link_opening(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_link_opening', event)

    def on_connection_error(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_connection_error', event)
        else:
            self.log_error(event.connection, "connection")

    def on_session_error(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_session_error', event)
        else:
            self.log_error(event.session, "session")
            event.connection.close()

    def on_link_error(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_link_error', event)
        else:
            self.log_error(event.link, "link")
            event.connection.close()

    def on_connection_closed(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_connection_closed', event)

    def on_session_closed(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_session_closed', event)

    def on_link_closed(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_link_closed', event)

    def on_connection_closing(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_connection_closing', event)
        elif self.peer_close_is_error:
            self.on_connection_error(event)

    def on_session_closing(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_session_closing', event)
        elif self.peer_close_is_error:
            self.on_session_error(event)

    def on_link_closing(self, event):
        if self.delegate:
            dispatch(self.delegate, 'on_link_closing', event)
        elif self.peer_close_is_error:
            self.on_link_error(event)

    def on_transport_tail_closed(self, event):
        self.on_transport_closed(event)

    def on_transport_closed(self, event):
        if self.delegate and event.connection and self.is_local_open(event.connection):
            dispatch(self.delegate, 'on_disconnected', event)

class MessagingHandler(Handler, Acking):
    """
    A general purpose handler that makes the proton-c events somewhat
    simpler to deal with and/or avoids repetitive tasks for common use
    cases.
    """
    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
        self.handlers = []
        if prefetch:
            self.handlers.append(CFlowController(prefetch))
        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
        self.handlers.append(IncomingMessageHandler(auto_accept, self))
        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
        self.fatal_conditions = ["amqp:unauthorized-access"]

    def on_transport_error(self, event):
        """
        Called when some error is encountered with the transport over
        which the AMQP connection is to be established. This includes
        authentication errors as well as socket errors.
        """
        if event.transport.condition:
            if event.transport.condition.info:
                logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
            else:
                logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
            if event.transport.condition.name in self.fatal_conditions:
                event.connection.close()
        else:
            logging.error("Unspecified transport error")

    def on_connection_error(self, event):
        """
        Called when the peer closes the connection with an error condition.
        """
        EndpointStateHandler.print_error(event.connection, "connection")

    def on_session_error(self, event):
        """
        Called when the peer closes the session with an error condition.
        """
        EndpointStateHandler.print_error(event.session, "session")
        event.connection.close()

    def on_link_error(self, event):
        """
        Called when the peer closes the link with an error condition.
        """
        EndpointStateHandler.print_error(event.link, "link")
        event.connection.close()

    def on_reactor_init(self, event):
        """
        Called when the event loop - the reactor - starts.
        """
        if hasattr(event.reactor, 'subclass'):
            setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
        self.on_start(event)

    def on_start(self, event):
        """
        Called when the event loop starts. (Just an alias for on_reactor_init)
        """
        pass
    def on_connection_closed(self, event):
        """
        Called when the connection is closed.
        """
        pass
    def on_session_closed(self, event):
        """
        Called when the session is closed.
        """
        pass
    def on_link_closed(self, event):
        """
        Called when the link is closed.
        """
        pass
    def on_connection_closing(self, event):
        """
        Called when the peer initiates the closing of the connection.
        """
        pass
    def on_session_closing(self, event):
        """
        Called when the peer initiates the closing of the session.
        """
        pass
    def on_link_closing(self, event):
        """
        Called when the peer initiates the closing of the link.
        """
        pass
    def on_disconnected(self, event):
        """
        Called when the socket is disconnected.
        """
        pass

    def on_sendable(self, event):
        """
        Called when the sender link has credit and messages can
        therefore be transferred.
        """
        pass

    def on_accepted(self, event):
        """
        Called when the remote peer accepts an outgoing message.
        """
        pass

    def on_rejected(self, event):
        """
        Called when the remote peer rejects an outgoing message.
        """
        pass

    def on_released(self, event):
        """
        Called when the remote peer releases an outgoing message. Note
        that this may be in response to either the RELEASE or MODIFIED
        state as defined by the AMQP specification.
        """
        pass

    def on_settled(self, event):
        """
        Called when the remote peer has settled the outgoing
        message. This is the point at which it shouod never be
        retransmitted.
        """
        pass
    def on_message(self, event):
        """
        Called when a message is received. The message itself can be
        obtained as a property on the event. For the purpose of
        refering to this message in further actions (e.g. if
        explicitly accepting it, the ``delivery`` should be used, also
        obtainable via a property on the event.
        """
        pass

class TransactionHandler(object):
    """
    The interface for transaction handlers, i.e. objects that want to
    be notified of state changes related to a transaction.
    """
    def on_transaction_declared(self, event):
        pass

    def on_transaction_committed(self, event):
        pass

    def on_transaction_aborted(self, event):
        pass

    def on_transaction_declare_failed(self, event):
        pass

    def on_transaction_commit_failed(self, event):
        pass

class TransactionalClientHandler(MessagingHandler, TransactionHandler):
    """
    An extension to the MessagingHandler for applications using
    transactions.
    """

    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
        super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)

    def accept(self, delivery, transaction=None):
        if transaction:
            transaction.accept(delivery)
        else:
            super(TransactionalClientHandler, self).accept(delivery)

from proton import WrappedHandler
from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler

class CFlowController(WrappedHandler):

    def __init__(self, window=1024):
        WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))

class CHandshaker(WrappedHandler):

    def __init__(self):
        WrappedHandler.__init__(self, pn_handshaker)

class IOHandler(WrappedHandler):

    def __init__(self):
        WrappedHandler.__init__(self, pn_iohandler)

class PythonIO:

    def __init__(self):
        self.selectables = []
        self.delegate = IOHandler()

    def on_unhandled(self, method, event):
        event.dispatch(self.delegate)

    def on_selectable_init(self, event):
        self.selectables.append(event.context)

    def on_selectable_updated(self, event):
        pass

    def on_selectable_final(self, event):
        sel = event.context
        if sel.is_terminal:
            self.selectables.remove(sel)
            sel.release()

    def on_reactor_quiesced(self, event):
        reactor = event.reactor
        # check if we are still quiesced, other handlers of
        # on_reactor_quiesced could have produced events to process
        if not reactor.quiesced: return

        reading = []
        writing = []
        deadline = None
        for sel in self.selectables:
            if sel.reading:
                reading.append(sel)
            if sel.writing:
                writing.append(sel)
            if sel.deadline:
                if deadline is None:
                    deadline = sel.deadline
                else:
                    deadline = min(sel.deadline, deadline)

        if deadline is not None:
            timeout = deadline - time.time()
        else:
            timeout = reactor.timeout
        if (timeout < 0): timeout = 0
        timeout = min(timeout, reactor.timeout)
        readable, writable, _ = select(reading, writing, [], timeout)

        reactor.mark()

        now = time.time()

        for s in readable:
            s.readable()
        for s in writable:
            s.writable()
        for s in self.selectables:
            if s.deadline and now > s.deadline:
                s.expired()

        reactor.yield_()