/usr/lib/python3/dist-packages/websockets/test_handshake.py is in python3-websockets 3.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | import contextlib
import unittest
from .exceptions import InvalidHandshake
from .handshake import *
from .handshake import accept # private API
class HandshakeTests(unittest.TestCase):
def test_accept(self):
# Test vector from RFC 6455
key = "dGhlIHNhbXBsZSBub25jZQ=="
acc = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="
self.assertEqual(accept(key), acc)
def test_round_trip(self):
request_headers = {}
request_key = build_request(request_headers.__setitem__)
response_key = check_request(request_headers.__getitem__)
self.assertEqual(request_key, response_key)
response_headers = {}
build_response(response_headers.__setitem__, response_key)
check_response(response_headers.__getitem__, request_key)
@contextlib.contextmanager
def assert_invalid_request_headers(self):
"""
Provide request headers for corruption.
Assert that the transformation made them invalid.
"""
headers = {}
build_request(headers.__setitem__)
yield headers
with self.assertRaises(InvalidHandshake):
check_request(headers.__getitem__)
def test_request_invalid_upgrade(self):
with self.assert_invalid_request_headers() as headers:
headers['Upgrade'] = 'socketweb'
def test_request_missing_upgrade(self):
with self.assert_invalid_request_headers() as headers:
del headers['Upgrade']
def test_request_invalid_connection(self):
with self.assert_invalid_request_headers() as headers:
headers['Connection'] = 'Downgrade'
def test_request_missing_connection(self):
with self.assert_invalid_request_headers() as headers:
del headers['Connection']
def test_request_invalid_key_not_base64(self):
with self.assert_invalid_request_headers() as headers:
headers['Sec-WebSocket-Key'] = "!@#$%^&*()"
def test_request_invalid_key_not_well_padded(self):
with self.assert_invalid_request_headers() as headers:
headers['Sec-WebSocket-Key'] = "CSIRmL8dWYxeAdr/XpEHRw"
def test_request_invalid_key_not_16_bytes_long(self):
with self.assert_invalid_request_headers() as headers:
headers['Sec-WebSocket-Key'] = "ZLpprpvK4PE="
def test_request_missing_key(self):
with self.assert_invalid_request_headers() as headers:
del headers['Sec-WebSocket-Key']
def test_request_invalid_version(self):
with self.assert_invalid_request_headers() as headers:
headers['Sec-WebSocket-Version'] = '42'
def test_request_missing_version(self):
with self.assert_invalid_request_headers() as headers:
del headers['Sec-WebSocket-Version']
@contextlib.contextmanager
def assert_invalid_response_headers(self, key='CSIRmL8dWYxeAdr/XpEHRw=='):
"""
Provide response headers for corruption.
Assert that the transformation made them invalid.
"""
headers = {}
build_response(headers.__setitem__, key)
yield headers
with self.assertRaises(InvalidHandshake):
check_response(headers.__getitem__, key)
def test_response_invalid_upgrade(self):
with self.assert_invalid_response_headers() as headers:
headers['Upgrade'] = 'socketweb'
def test_response_missing_upgrade(self):
with self.assert_invalid_response_headers() as headers:
del headers['Upgrade']
def test_response_invalid_connection(self):
with self.assert_invalid_response_headers() as headers:
headers['Connection'] = 'Downgrade'
def test_response_missing_connection(self):
with self.assert_invalid_response_headers() as headers:
del headers['Connection']
def test_response_invalid_accept(self):
with self.assert_invalid_response_headers() as headers:
other_key = "1Eq4UDEFQYg3YspNgqxv5g=="
headers['Sec-WebSocket-Accept'] = accept(other_key)
def test_response_missing_accept(self):
with self.assert_invalid_response_headers() as headers:
del headers['Sec-WebSocket-Accept']
|