This file is indexed.

/usr/lib/python3/dist-packages/asyncssh/compression.py is in python3-asyncssh 1.3.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Copyright (c) 2013-2015 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""SSH compression handlers"""

import zlib


_cmp_algs = []
_cmp_params = {}
_cmp_compressors = {}
_cmp_decompressors = {}


def _none():
    """Compressor/decompressor for no compression."""

    return None


class _ZLibCompress:
    """Wrapper class to force a sync flush when compressing"""

    def __init__(self):
        self._comp = zlib.compressobj()

    def compress(self, data):
        """Compress data using zlib compression with sync flush"""

        return self._comp.compress(data) + self._comp.flush(zlib.Z_SYNC_FLUSH)


def register_compression_alg(alg, compressor, decompressor, after_auth):
    """Register a compression algorithm"""

    _cmp_algs.append(alg)
    _cmp_params[alg] = after_auth
    _cmp_compressors[alg] = compressor
    _cmp_decompressors[alg] = decompressor


def get_compression_algs():
    """Return a list of available compression algorithms"""

    return _cmp_algs


def get_compression_params(alg):
    """Get parameters of a compression algorithm

       This function returns whether or not a compression algorithm should
       be delayed until after authentication completes.

    """

    return _cmp_params[alg]


def get_compressor(alg):
    """Return an instance of a compressor

       This function returns an object that can be used for data compression.

    """

    return _cmp_compressors[alg]()


def get_decompressor(alg):
    """Return an instance of a decompressor

       This function returns an object that can be used for data decompression.

    """

    return _cmp_decompressors[alg]()

# pylint: disable=bad-whitespace

register_compression_alg(b'zlib@openssh.com',
                         _ZLibCompress, zlib.decompressobj, True)
register_compression_alg(b'zlib',
                         _ZLibCompress, zlib.decompressobj, False)
register_compression_alg(b'none',
                         _none,         _none,              False)