/usr/lib/python2.7/dist-packages/pyngus/connection.py is in python-pyngus 2.2.1-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 | # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
"ConnectionEventHandler",
"Connection"
]
import heapq
import logging
import proton
import warnings
import ssl
from pyngus.endpoint import Endpoint
from pyngus.link import _Link
from pyngus.link import _SessionProxy
LOG = logging.getLogger(__name__)
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
class _CallbackLock(object):
"""A utility class for detecting when a callback invokes a non-reentrant
Pyngus method.
"""
def __init__(self):
super(_CallbackLock, self).__init__()
self.in_callback = 0
def __enter__(self):
self.in_callback += 1
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.in_callback -= 1
# if a call is made to a non-reentrant method while this context is
# held, then the method will raise a RuntimeError(). Return false to
# propagate the exception to the caller
return False
class ConnectionEventHandler(object):
"""An implementation of an AMQP 1.0 Connection."""
def connection_active(self, connection):
"""Connection handshake has completed."""
LOG.debug("connection_active (ignored)")
def connection_failed(self, connection, error):
"""Connection's transport has failed in some way."""
LOG.warn("connection_failed, error=%s (ignored)", str(error))
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed (ignored)")
def connection_closed(self, connection):
"""The connection has cleanly closed."""
LOG.debug("connection_closed (ignored)")
def sender_requested(self, connection, link_handle,
name, requested_source,
properties):
"""Peer has requested a SenderLink be created."""
# call accept_sender to accept new link,
# reject_sender to reject it.
LOG.debug("sender_requested (ignored)")
def receiver_requested(self, connection, link_handle,
name, requested_target,
properties):
"""Peer has requested a ReceiverLink be created."""
# call accept_receiver to accept new link,
# reject_receiver to reject it.
LOG.debug("receiver_requested (ignored)")
# No longer supported by proton >= 0.10, so this method is deprecated
def sasl_step(self, connection, pn_sasl):
"""DEPRECATED"""
LOG.debug("sasl_step (ignored)")
def sasl_done(self, connection, pn_sasl, result):
"""SASL exchange complete."""
LOG.debug("sasl_done (ignored)")
class Connection(Endpoint):
"""A Connection to a peer."""
EOS = -1 # indicates 'I/O stream closed'
# set of all SASL connection configuration properties
_SASL_PROPS = set(['x-username', 'x-password', 'x-require-auth',
'x-sasl-mechs', 'x-sasl-config-dir',
'x-sasl-config-name', 'x-force-sasl'])
# set of all SSL connection configuration properties
_SSL_PROPS = set(['x-ssl', 'x-ssl-identity', 'x-ssl-ca-file',
'x-ssl-verify-mode', 'x-ssl-server',
'x-ssl-peer-name', 'x-ssl-allow-cleartext'])
# SSL peer certificate verification
_VERIFY_MODES = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
'verify-cert': proton.SSLDomain.VERIFY_PEER,
'no-verify': proton.SSLDomain.ANONYMOUS_PEER}
def _not_reentrant(func):
"""Decorator that prevents callbacks from calling into methods that are
not reentrant
"""
def wrap(self, *args, **kws):
if self._callback_lock and self._callback_lock.in_callback:
m = "Connection %s cannot be invoked from a callback!" % func
raise RuntimeError(m)
return func(self, *args, **kws)
return wrap
def __init__(self, container, name, event_handler=None, properties=None):
"""Create a new connection from the Container
properties: map, properties of the new connection. The following keys
and values are supported:
idle-time-out: float, time in seconds before an idle link will be
closed.
hostname: string, the name of the host to which this connection is
being made, sent in the Open frame.
max-frame-size: int, maximum acceptable frame size in bytes.
properties: map, proton connection properties sent to the peer.
The following custom connection properties are supported:
x-server: boolean, set this to True to configure the connection as a
server side connection. This should be set True if the connection was
remotely initiated (e.g. accept on a listening socket). If the
connection was locally initiated (e.g. by calling connect()), then this
value should be set to False. This setting is used by authentication
and encryption to configure the connection's role. The default value
is False for client mode.
x-username: string, the client's username to use when authenticating
with a server.
x-password: string, the client's password, used for authentication.
x-require-auth: boolean, reject remotely-initiated client connections
that fail to provide valid credentials for authentication.
x-sasl-mechs: string, a space-separated list of mechanisms
that are allowed for authentication. Defaults to "ANONYMOUS"
x-sasl-config-dir: string, path to the directory containing the Cyrus
SASL server configuration.
x-sasl-config-name: string, name of the Cyrus SASL configuration file
contained in the x-sasl-config-dir (without the '.conf' suffix)
x-force-sasl: by default SASL authentication is disabled. SASL will be
enabled if any of the above x-sasl-* options are set. For clients using
GSSAPI it is likely none of these options will be set. In order for
these clients to authenticate this flag must be set true. The value of
this property is ignored if any of the other SASL related properties
are set.
x-ssl: boolean, Allows clients to connect using SSL setting a minimum
viable configuration (using the system's CA bundle to validate the
peer's certificate). This setting is overwritten if subsequent SSL
settings are found.
x-ssl-identity: tuple, contains identifying certificate information
which will be presented to the peer. The first item in the tuple is
the path to the certificate file (PEM format). The second item is the
path to a file containing the private key used to sign the certificate
(PEM format, optional if private key is stored in the certificate
itself). The last item is the password used to encrypt the private key
(string, not required if private key is not encrypted)
x-ssl-ca-file: string, path to a file containing the certificates of
the trusted Certificate Authorities that will be used to check the
signature of the peer's certificate. Not used if x-ssl-verify-mode
is set to 'no-verify'. To use the system's default CAs instead leave
this option out and set x-ssl to True.
x-ssl-verify-mode: string, configure the level of security provided by
SSL. Possible values:
"verify-peer" (default) - most secure, requires peer to supply a
certificate signed by a valid CA (see x-ssl-ca-file), and check
the CN or SAN entry in the certificate against the expected
peer hostname (see hostname and x-ssl-peer-name properties)
"verify-cert" (default if no x-ssl-peer-name given) - like
verify-peer, but skips the check of the peer hostname.
Vulnerable to man-in-the-middle attack.
"no-verify" - do not require the peer to provide a certificate.
Results in a weaker encryption stream, and other vulnerabilities.
x-ssl-peer-name: string, DNS name of peer. Override the hostname used
to authenticate peer's certificate (see x-ssl-verify-mode). The value
of the 'hostname' property is used if this property is not supplied.
x-ssl-allow-cleartext: boolean, Allows clients to connect without using
SSL (eg, plain TCP). Used by a server that will accept clients
requesting either trusted or untrusted connections.
x-trace-protocol: boolean, if true, dump sent and received frames to
stdout.
"""
super(Connection, self).__init__(name)
self._transport_bound = False
self._container = container
self._handler = event_handler
self._properties = properties or {}
old_flag = self._properties.get('x-ssl-server', False)
self._server = self._properties.get('x-server', old_flag)
self._pn_connection = proton.Connection()
self._pn_connection.container = container.name
if (_PROTON_VERSION < (0, 9)):
self._pn_transport = proton.Transport()
else:
if self._server:
mode = proton.Transport.SERVER
else:
mode = proton.Transport.CLIENT
self._pn_transport = proton.Transport(mode)
self._pn_collector = proton.Collector()
self._pn_connection.collect(self._pn_collector)
if 'hostname' in self._properties:
self._pn_connection.hostname = self._properties['hostname']
secs = self._properties.get("idle-time-out")
if secs:
self._pn_transport.idle_timeout = secs
max_frame = self._properties.get("max-frame-size")
if max_frame:
self._pn_transport.max_frame_size = max_frame
if 'properties' in self._properties:
self._pn_connection.properties = self._properties["properties"]
if self._properties.get("x-trace-protocol"):
self._pn_transport.trace(proton.Transport.TRACE_FRM)
# indexed by link-name
self._sender_links = {} # SenderLink
self._receiver_links = {} # ReceiverLink
self._timers = {} # indexed by expiration date
self._timers_heap = [] # sorted by expiration date
self._read_done = False
self._write_done = False
self._error = None
self._next_deadline = 0
self._user_context = None
self._remote_session_id = 0
self._callback_lock = _CallbackLock()
self._pn_sasl = None
self._sasl_done = False
# if x-force-sasl is false remove it so it does not trigger the SASL
# configuration logic below
if not self._properties.get('x-force-sasl', True):
del self._properties['x-force-sasl']
if self._SASL_PROPS.intersection(set(self._properties.keys())):
# SASL config specified, need to enable SASL
if (_PROTON_VERSION < (0, 10)):
# best effort map of 0.10 sasl config to pre-0.10 sasl
if self._server:
self.pn_sasl.server()
if 'x-require-auth' in self._properties:
if not self._properties['x-require-auth']:
if _PROTON_VERSION >= (0, 8):
self.pn_sasl.allow_skip()
else:
if 'x-username' in self._properties:
self.pn_sasl.plain(self._properties['x-username'],
self._properties.get('x-password',
''))
else:
self.pn_sasl.client()
mechs = self._properties.get('x-sasl-mechs')
if mechs:
self.pn_sasl.mechanisms(mechs)
else:
# new Proton SASL configuration:
# maintain old behavior: allow PLAIN and ANONYMOUS
# authentication. Override this using x-sasl-mechs below:
self.pn_sasl.allow_insecure_mechs = True
if 'x-require-auth' in self._properties:
ra = self._properties['x-require-auth']
self._pn_transport.require_auth(ra)
if 'x-username' in self._properties:
self._pn_connection.user = self._properties['x-username']
if 'x-password' in self._properties:
self._pn_connection.password = \
self._properties['x-password']
if 'x-sasl-mechs' in self._properties:
mechs = self._properties['x-sasl-mechs'].upper()
self.pn_sasl.allowed_mechs(mechs)
if 'PLAIN' not in mechs and 'ANONYMOUS' not in mechs:
self.pn_sasl.allow_insecure_mechs = False
if 'x-sasl-config-dir' in self._properties:
self.pn_sasl.config_path(
self._properties['x-sasl-config-dir'])
if 'x-sasl-config-name' in self._properties:
self.pn_sasl.config_name(
self._properties['x-sasl-config-name'])
# intercept any SSL failures and cleanup resources before propagating
# the exception:
try:
self._pn_ssl = self._configure_ssl(properties)
except:
self.destroy()
raise
@property
def container(self):
return self._container
@property
# TODO(kgiusti) - hopefully remove
def pn_transport(self):
return self._pn_transport
@property
# TODO(kgiusti) - hopefully remove
def pn_connection(self):
return self._pn_connection
@property
def name(self):
return self._name
@property
def remote_container(self):
"""Return the name of the remote container. Should be present once the
connection is active.
"""
return self._pn_connection.remote_container
@property
def remote_hostname(self):
"""Return the hostname advertised by the remote, if present."""
if self._pn_connection:
return self._pn_connection.remote_hostname
return None
@property
def remote_properties(self):
"""Properties provided by the peer."""
if self._pn_connection:
return self._pn_connection.remote_properties
return None
@property
def pn_sasl(self):
if not self._pn_sasl:
self._pn_sasl = self._pn_transport.sasl()
return self._pn_sasl
def pn_ssl(self):
"""Return the Proton SSL context for this Connection."""
return self._pn_ssl
def _get_user_context(self):
return self._user_context
def _set_user_context(self, ctxt):
self._user_context = ctxt
_uc_docstr = """Associate an arbitrary user object with this Connection."""
user_context = property(_get_user_context, _set_user_context,
doc=_uc_docstr)
def open(self):
if not self._transport_bound:
self._pn_transport.bind(self._pn_connection)
self._transport_bound = True
if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
self._pn_connection.open()
def close(self, pn_condition=None):
for link in list(self._sender_links.values()):
link.close(pn_condition)
for link in list(self._receiver_links.values()):
link.close(pn_condition)
if pn_condition:
self._pn_connection.condition = pn_condition
if self._pn_connection.state & proton.Endpoint.LOCAL_ACTIVE:
self._pn_connection.close()
@property
def active(self):
"""Return True if both ends of the Connection are open."""
return self._endpoint_state == self._ACTIVE
@property
def closed(self):
"""Return True if the Connection has finished closing."""
return (self._write_done and self._read_done)
@_not_reentrant
def destroy(self):
# if a connection is destroyed without flushing pending output,
# the remote will see an unclean shutdown (framing error)
if self.has_output > 0:
LOG.debug("Connection with buffered output destroyed")
self._error = "Destroyed by the application"
self._handler = None
self._properties = None
tmp = self._sender_links.copy()
for l in tmp.values():
l.destroy()
assert(len(self._sender_links) == 0)
tmp = self._receiver_links.copy()
for l in tmp.values():
l.destroy()
assert(len(self._receiver_links) == 0)
self._timers.clear()
self._timers_heap = None
self._container.remove_connection(self._name)
self._container = None
self._user_context = None
self._callback_lock = None
if self._transport_bound:
self._pn_transport.unbind()
self._pn_transport = None
self._pn_connection.free()
self._pn_connection = None
if _PROTON_VERSION < (0, 8):
# memory leak: drain the collector before releasing it
while self._pn_collector.peek():
self._pn_collector.pop()
self._pn_collector = None
self._pn_sasl = None
self._pn_ssl = None
_CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED)
_ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)
@_not_reentrant
def process(self, now):
"""Perform connection state processing."""
if self._pn_connection is None:
LOG.error("Connection.process() called on destroyed connection!")
return 0
# do nothing until the connection has been opened
if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
return 0
if self._pn_sasl and not self._sasl_done:
# wait until SASL has authenticated
if (_PROTON_VERSION < (0, 10)):
if self._pn_sasl.state not in (proton.SASL.STATE_PASS,
proton.SASL.STATE_FAIL):
LOG.debug("SASL in progress. State=%s",
str(self._pn_sasl.state))
if self._handler:
with self._callback_lock:
self._handler.sasl_step(self, self._pn_sasl)
return self._next_deadline
self._sasl_done = True
if self._handler:
with self._callback_lock:
self._handler.sasl_done(self, self._pn_sasl,
self._pn_sasl.outcome)
else:
if self._pn_sasl.outcome is not None:
self._sasl_done = True
if self._handler:
with self._callback_lock:
self._handler.sasl_done(self, self._pn_sasl,
self._pn_sasl.outcome)
# process timer events:
timer_deadline = self._expire_timers(now)
transport_deadline = self._pn_transport.tick(now)
if timer_deadline and transport_deadline:
self._next_deadline = min(timer_deadline, transport_deadline)
else:
self._next_deadline = timer_deadline or transport_deadline
# process events from proton:
pn_event = self._pn_collector.peek()
while pn_event:
# LOG.debug("pn_event: %s received", pn_event.type)
if _Link._handle_proton_event(pn_event, self):
pass
elif self._handle_proton_event(pn_event):
pass
elif _SessionProxy._handle_proton_event(pn_event, self):
pass
self._pn_collector.pop()
pn_event = self._pn_collector.peek()
# check for connection failure after processing all pending
# engine events:
if self._error:
if self._handler:
# nag application until connection is destroyed
self._next_deadline = now
with self._callback_lock:
self._handler.connection_failed(self, self._error)
elif (self._endpoint_state == self._CLOSED and
self._read_done and self._write_done):
# invoke closed callback after endpoint has fully closed and
# all pending I/O has completed:
if self._handler:
with self._callback_lock:
self._handler.connection_closed(self)
return self._next_deadline
@property
def next_tick(self):
text = "next_tick deprecated, use deadline instead"
warnings.warn(DeprecationWarning(text))
return self.deadline
@property
def deadline(self):
"""Must invoke process() on or before this timestamp."""
return self._next_deadline
@property
def needs_input(self):
if self._read_done:
LOG.debug("needs_input EOS")
return self.EOS
try:
capacity = self._pn_transport.capacity()
except Exception as e:
self._read_done = True
self._connection_failed(str(e))
return self.EOS
if capacity >= 0:
return capacity
LOG.debug("needs_input read done")
self._read_done = True
return self.EOS
def process_input(self, in_data):
c = min(self.needs_input, len(in_data))
if c <= 0:
return c
try:
rc = self._pn_transport.push(in_data[:c])
except Exception as e:
self._read_done = True
self._connection_failed(str(e))
return self.EOS
if rc: # error?
LOG.debug("process_input read done")
self._read_done = True
return self.EOS
# hack: check if this was the last input needed by the connection.
# If so, this will set the _read_done flag and the 'connection closed'
# callback can be issued on the next call to process()
self.needs_input
return c
def close_input(self, reason=None):
if not self._read_done:
try:
self._pn_transport.close_tail()
except Exception as e:
self._connection_failed(str(e))
LOG.debug("close_input read done")
self._read_done = True
@property
def has_output(self):
if self._write_done:
LOG.debug("has output EOS")
return self.EOS
try:
pending = self._pn_transport.pending()
except Exception as e:
self._write_done = True
self._connection_failed(str(e))
return self.EOS
if pending >= 0:
return pending
LOG.debug("has output write_done")
self._write_done = True
return self.EOS
def output_data(self):
"""Get a buffer of data that needs to be written to the network.
"""
c = self.has_output
if c <= 0:
return None
try:
buf = self._pn_transport.peek(c)
except Exception as e:
self._connection_failed(str(e))
return None
return buf
def output_written(self, count):
try:
self._pn_transport.pop(count)
except Exception as e:
self._write_done = True
self._connection_failed(str(e))
# hack: check if this was the last output from the connection. If so,
# this will set the _write_done flag and the 'connection closed'
# callback can be issued on the next call to process()
self.has_output
def close_output(self, reason=None):
if not self._write_done:
try:
self._pn_transport.close_head()
except Exception as e:
self._connection_failed(str(e))
LOG.debug("close output write done")
self._write_done = True
def create_sender(self, source_address, target_address=None,
event_handler=None, name=None, properties=None):
"""Factory method for Sender links."""
ident = name or str(source_address)
if ident in self._sender_links:
raise KeyError("Sender %s already exists!" % ident)
session = _SessionProxy("session-%s" % ident, self)
session.open()
sl = session.new_sender(ident)
sl.configure(target_address, source_address, event_handler, properties)
self._sender_links[ident] = sl
return sl
def accept_sender(self, link_handle, source_override=None,
event_handler=None, properties=None):
link = self._sender_links.get(link_handle)
if not link:
raise Exception("Invalid link_handle: %s" % link_handle)
pn_link = link._pn_link
if pn_link.remote_source.dynamic and not source_override:
raise Exception("A source address must be supplied!")
source_addr = source_override or pn_link.remote_source.address
link.configure(pn_link.remote_target.address,
source_addr,
event_handler, properties)
return link
def reject_sender(self, link_handle, pn_condition=None):
"""Rejects the SenderLink, and destroys the handle."""
link = self._sender_links.get(link_handle)
if not link:
raise Exception("Invalid link_handle: %s" % link_handle)
link.reject(pn_condition)
# note: normally, link.destroy() cannot be called from a callback,
# but this link was never made available to the application so this
# link is only referenced by the connection
link.destroy()
def create_receiver(self, target_address, source_address=None,
event_handler=None, name=None, properties=None):
"""Factory method for creating Receive links."""
ident = name or str(target_address)
if ident in self._receiver_links:
raise KeyError("Receiver %s already exists!" % ident)
session = _SessionProxy("session-%s" % ident, self)
session.open()
rl = session.new_receiver(ident)
rl.configure(target_address, source_address, event_handler, properties)
self._receiver_links[ident] = rl
return rl
def accept_receiver(self, link_handle, target_override=None,
event_handler=None, properties=None):
link = self._receiver_links.get(link_handle)
if not link:
raise Exception("Invalid link_handle: %s" % link_handle)
pn_link = link._pn_link
if pn_link.remote_target.dynamic and not target_override:
raise Exception("A target address must be supplied!")
target_addr = target_override or pn_link.remote_target.address
link.configure(target_addr,
pn_link.remote_source.address,
event_handler, properties)
return link
def reject_receiver(self, link_handle, pn_condition=None):
link = self._receiver_links.get(link_handle)
if not link:
raise Exception("Invalid link_handle: %s" % link_handle)
link.reject(pn_condition)
# note: normally, link.destroy() cannot be called from a callback,
# but this link was never made available to the application so this
# link is only referenced by the connection
link.destroy()
@property
def _endpoint_state(self):
return self._pn_connection.state
def _remove_sender(self, name):
if name in self._sender_links:
del self._sender_links[name]
def _remove_receiver(self, name):
if name in self._receiver_links:
del self._receiver_links[name]
def _connection_failed(self, error="Error not specified!"):
"""Clean up after connection failure detected."""
if not self._error:
LOG.error("Connection failed: %s", str(error))
self._error = error
def _configure_ssl(self, properties):
if (not properties or
not self._SSL_PROPS.intersection(set(iter(properties)))):
return None
mode = proton.SSLDomain.MODE_CLIENT
if properties.get('x-ssl-server', properties.get('x-server')):
mode = proton.SSLDomain.MODE_SERVER
identity = properties.get('x-ssl-identity')
ca_file = properties.get('x-ssl-ca-file')
if (not ca_file and properties.get('x-ssl') and
hasattr(ssl, 'get_default_verify_paths')):
ca_file = ssl.get_default_verify_paths().cafile
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
# default to most secure level of certificate validation
if not ca_file:
vdefault = 'no-verify'
elif not hostname:
vdefault = 'verify-cert'
else:
vdefault = 'verify-peer'
vmode = properties.get('x-ssl-verify-mode', vdefault)
try:
vmode = self._VERIFY_MODES[vmode]
except KeyError:
raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
vmode)
if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
if not hostname or not ca_file:
raise proton.SSLException("verify-peer needs x-ssl-peer-name"
" and x-ssl-ca-file")
elif vmode == proton.SSLDomain.VERIFY_PEER:
if not ca_file:
raise proton.SSLException("verify-cert needs x-ssl-ca-file")
# This will throw proton.SSLUnavailable if SSL support is not installed
domain = proton.SSLDomain(mode)
if identity:
# our identity:
domain.set_credentials(identity[0], identity[1], identity[2])
if ca_file:
# how we verify peers:
domain.set_trusted_ca_db(ca_file)
domain.set_peer_authentication(vmode, ca_file)
if mode == proton.SSLDomain.MODE_SERVER:
if properties.get('x-ssl-allow-cleartext'):
domain.allow_unsecured_client()
pn_ssl = proton.SSL(self._pn_transport, domain)
if hostname:
pn_ssl.peer_hostname = hostname
LOG.debug("SSL configured for connection %s", self._name)
return pn_ssl
def _add_timer(self, deadline, callback):
callbacks = self._timers.get(deadline)
if callbacks is None:
callbacks = set()
self._timers[deadline] = callbacks
heapq.heappush(self._timers_heap, deadline)
if deadline < self._next_deadline:
self._next_deadline = deadline
callbacks.add(callback)
def _cancel_timer(self, deadline, callback):
callbacks = self._timers.get(deadline)
if callbacks:
callbacks.discard(callback)
# next expire will discard empty deadlines
def _expire_timers(self, now):
while (self._timers_heap and
self._timers_heap[0] <= now):
deadline = heapq.heappop(self._timers_heap)
callbacks = self._timers.get(deadline)
while callbacks:
callbacks.pop()()
del self._timers[deadline]
return self._timers_heap[0] if self._timers_heap else 0
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.CONNECTION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.CONNECTION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.CONNECTION_LOCAL_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.CONNECTION_LOCAL_CLOSE: Endpoint.LOCAL_CLOSED}
def _handle_proton_event(self, pn_event):
ep_event = Connection._endpoint_event_map.get(pn_event.type)
if ep_event is not None:
self._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.CONNECTION_INIT:
LOG.debug("Connection created: %s", pn_event.context)
elif pn_event.type == proton.Event.CONNECTION_FINAL:
LOG.debug("Connection finalized: %s", pn_event.context)
elif pn_event.type == proton.Event.TRANSPORT_ERROR:
self._connection_failed(str(self._pn_transport.condition))
else:
return False # unknown
return True # handled
elif hasattr(proton.Event, "CONNECTION_LOCAL_STATE"):
# 0.7 proton event model
def _handle_proton_event(self, pn_event):
if pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
self._process_local_state()
elif pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
self._process_remote_state()
else:
return False # unknown
return True # handled
else:
raise Exception("The installed version of Proton is not supported.")
# endpoint state machine actions:
def _ep_active(self):
"""Both ends of the Endpoint have become active."""
LOG.debug("Connection is up")
if self._handler:
with self._callback_lock:
self._handler.connection_active(self)
def _ep_need_close(self):
"""The remote has closed its end of the endpoint."""
LOG.debug("Connection remotely closed")
if self._handler:
cond = self._pn_connection.remote_condition
with self._callback_lock:
self._handler.connection_remote_closed(self, cond)
def _ep_error(self, error):
"""The endpoint state machine failed due to protocol error."""
super(Connection, self)._ep_error(error)
self._connection_failed("Protocol error occurred.")
# order by name
def __lt__(self, other):
return self.name < other.name
def __le__(self, other):
return self < other or self.name == other.name
def __gt__(self, other):
return self.name > other.name
def __ge__(self, other):
return self > other or self.name == other.name
|