This file is indexed.

/usr/lib/python3/dist-packages/asyncssh/ecdh.py is in python3-asyncssh 1.3.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Copyright (c) 2013-2015 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""Elliptic curve Diffie-Hellman key exchange handler"""

from hashlib import sha256, sha384, sha512

from .constants import DISC_KEY_EXCHANGE_FAILED, DISC_PROTOCOL_ERROR
from .kex import Kex, register_kex_alg
from .misc import DisconnectError
from .packet import Byte, MPInt, String

# pylint: disable=bad-whitespace

# SSH KEX ECDH message values
MSG_KEX_ECDH_INIT  = 30
MSG_KEX_ECDH_REPLY = 31

# pylint: enable=bad-whitespace


class _KexECDH(Kex):
    """Handler for elliptic curve Diffie-Hellman key exchange"""

    def __init__(self, alg, conn, hash_alg, ecdh_class, *args):
        super().__init__(alg, conn, hash_alg)

        self._priv = ecdh_class(*args)
        pub = self._priv.get_public()

        if conn.is_client():
            self._client_pub = pub
            self._conn.send_packet(Byte(MSG_KEX_ECDH_INIT), String(pub))
        else:
            self._server_pub = pub

    def _compute_hash(self, host_key_data, k):
        """Compute a hash of key information associated with the connection"""

        hash_obj = self._hash_alg()
        hash_obj.update(self._conn.get_hash_prefix())
        hash_obj.update(String(host_key_data))
        hash_obj.update(String(self._client_pub))
        hash_obj.update(String(self._server_pub))
        hash_obj.update(MPInt(k))
        return hash_obj.digest()

    def _process_init(self, pkttype, packet):
        """Process an ECDH init message"""

        # pylint: disable=unused-argument

        if self._conn.is_client():
            raise DisconnectError(DISC_PROTOCOL_ERROR,
                                  'Unexpected kex init msg')

        self._client_pub = packet.get_string()
        packet.check_end()

        try:
            k = self._priv.get_shared(self._client_pub)
        except (AssertionError, ValueError):
            raise DisconnectError(DISC_PROTOCOL_ERROR,
                                  'Invalid kex init msg') from None

        host_key, host_key_data = self._conn.get_server_host_key()

        h = self._compute_hash(host_key_data, k)
        sig = host_key.sign(h)

        self._conn.send_packet(Byte(MSG_KEX_ECDH_REPLY), String(host_key_data),
                               String(self._server_pub), String(sig))

        self._conn.send_newkeys(k, h)

    def _process_reply(self, pkttype, packet):
        """Process an ECDH reply message"""

        # pylint: disable=unused-argument

        if self._conn.is_server():
            raise DisconnectError(DISC_PROTOCOL_ERROR,
                                  'Unexpected kex reply msg')

        host_key_data = packet.get_string()
        self._server_pub = packet.get_string()
        sig = packet.get_string()
        packet.check_end()

        try:
            k = self._priv.get_shared(self._server_pub)
        except (AssertionError, ValueError):
            raise DisconnectError(DISC_PROTOCOL_ERROR,
                                  'Invalid kex reply msg') from None

        host_key = self._conn.validate_server_host_key(host_key_data)

        h = self._compute_hash(host_key_data, k)
        if not host_key.verify(h, sig):
            raise DisconnectError(DISC_KEY_EXCHANGE_FAILED,
                                  'Key exchange hash mismatch')

        self._conn.send_newkeys(k, h)

    packet_handlers = {
        MSG_KEX_ECDH_INIT:  _process_init,
        MSG_KEX_ECDH_REPLY: _process_reply
    }


try:
    from .crypto import ECDH
except ImportError: # pragma: no cover
    pass
else:
    for _curve_id, _hash_alg in ((b'nistp521', sha512),
                                 (b'nistp384', sha384),
                                 (b'nistp256', sha256)):
        register_kex_alg(b'ecdh-sha2-' + _curve_id, _KexECDH,
                         _hash_alg, ECDH, _curve_id)

try:
    from .crypto import Curve25519DH
except ImportError: # pragma: no cover
    pass
else:
    register_kex_alg(b'curve25519-sha256@libssh.org', _KexECDH,
                     sha256, Curve25519DH)