This file is indexed.

/usr/lib/python3/dist-packages/websockets/test_handshake.py is in python3-websockets 3.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import contextlib
import unittest

from .exceptions import InvalidHandshake
from .handshake import *
from .handshake import accept  # private API


class HandshakeTests(unittest.TestCase):

    def test_accept(self):
        # Test vector from RFC 6455
        key = "dGhlIHNhbXBsZSBub25jZQ=="
        acc = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="
        self.assertEqual(accept(key), acc)

    def test_round_trip(self):
        request_headers = {}
        request_key = build_request(request_headers.__setitem__)
        response_key = check_request(request_headers.__getitem__)
        self.assertEqual(request_key, response_key)
        response_headers = {}
        build_response(response_headers.__setitem__, response_key)
        check_response(response_headers.__getitem__, request_key)

    @contextlib.contextmanager
    def assert_invalid_request_headers(self):
        """
        Provide request headers for corruption.

        Assert that the transformation made them invalid.

        """
        headers = {}
        build_request(headers.__setitem__)
        yield headers
        with self.assertRaises(InvalidHandshake):
            check_request(headers.__getitem__)

    def test_request_invalid_upgrade(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Upgrade'] = 'socketweb'

    def test_request_missing_upgrade(self):
        with self.assert_invalid_request_headers() as headers:
            del headers['Upgrade']

    def test_request_invalid_connection(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Connection'] = 'Downgrade'

    def test_request_missing_connection(self):
        with self.assert_invalid_request_headers() as headers:
            del headers['Connection']

    def test_request_invalid_key_not_base64(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Sec-WebSocket-Key'] = "!@#$%^&*()"

    def test_request_invalid_key_not_well_padded(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Sec-WebSocket-Key'] = "CSIRmL8dWYxeAdr/XpEHRw"

    def test_request_invalid_key_not_16_bytes_long(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Sec-WebSocket-Key'] = "ZLpprpvK4PE="

    def test_request_missing_key(self):
        with self.assert_invalid_request_headers() as headers:
            del headers['Sec-WebSocket-Key']

    def test_request_invalid_version(self):
        with self.assert_invalid_request_headers() as headers:
            headers['Sec-WebSocket-Version'] = '42'

    def test_request_missing_version(self):
        with self.assert_invalid_request_headers() as headers:
            del headers['Sec-WebSocket-Version']

    @contextlib.contextmanager
    def assert_invalid_response_headers(self, key='CSIRmL8dWYxeAdr/XpEHRw=='):
        """
        Provide response headers for corruption.

        Assert that the transformation made them invalid.

        """
        headers = {}
        build_response(headers.__setitem__, key)
        yield headers
        with self.assertRaises(InvalidHandshake):
            check_response(headers.__getitem__, key)

    def test_response_invalid_upgrade(self):
        with self.assert_invalid_response_headers() as headers:
            headers['Upgrade'] = 'socketweb'

    def test_response_missing_upgrade(self):
        with self.assert_invalid_response_headers() as headers:
            del headers['Upgrade']

    def test_response_invalid_connection(self):
        with self.assert_invalid_response_headers() as headers:
            headers['Connection'] = 'Downgrade'

    def test_response_missing_connection(self):
        with self.assert_invalid_response_headers() as headers:
            del headers['Connection']

    def test_response_invalid_accept(self):
        with self.assert_invalid_response_headers() as headers:
            other_key = "1Eq4UDEFQYg3YspNgqxv5g=="
            headers['Sec-WebSocket-Accept'] = accept(other_key)

    def test_response_missing_accept(self):
        with self.assert_invalid_response_headers() as headers:
            del headers['Sec-WebSocket-Accept']