This file is indexed.

/usr/lib/python3/dist-packages/simple_cdd/tools/mirror_wget.py is in python3-simple-cdd 0.6.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from simple_cdd.exceptions import Fail
from simple_cdd.utils import run_command, Checksums
from simple_cdd.gnupg import Gnupg
from .base import Tool
from urllib.parse import urlparse, urljoin
import os
import re
import logging

log = logging.getLogger()

@Tool.register
class ToolMirrorWget(Tool):
    type = "mirror"
    name = "wget"

    def __init__(self, env):
        self.env = env
        self.gnupg = Gnupg(env)

    def check_pre(self):
        super().check_pre()
        if not self.env.get("DI_WWW_HOME") and not self.env.get("custom_installer"):
            if not self.env.get("checksum_files"):
                raise Fail("Cannot run mirror/wget: checksum_files is empty")
            if not self.env.get("di_match_files"):
                raise Fail("Cannot run mirror/wget: di_match_files is empty")

    def run(self):
        env = self.env
        logdir = env.get("simple_cdd_logs")
        logfilename = os.path.join(logdir, "{}-{}.log".format(self.type, self.name))

        with open(logfilename, "wt") as logfd:
            baseurl = env.get("wget_debian_mirror")
            path_depth = urlparse(baseurl).path.strip("/").count("/") + 1

            def _wget_many(urls):
                args = ["wget", "--continue", "--timestamping", "--no-verbose",
                        "--no-parent", "--no-host-directories", "--recursive", "--cut-dirs={}".format(path_depth),
                        "--directory-prefix=" + env.get("MIRROR")]
                args.extend(urls)
                retval = run_command("wget {} files".format(len(urls)), args, logfd=logfd, env=wget_env)
                if retval != 0:
                    raise Fail("wget exited with code %s, see %s for full output log", retval, logfilename)

            def _wget_one(url, output):
                args = ["wget", "-O", output, url]
                retval = run_command("wget {}".format(url), args, logfd=logfd, env=wget_env)
                if retval != 0:
                    raise Fail("wget exited with code %s, see %s for full output log", retval, logfilename)

            checksum_files = env.get("checksum_files")

            # Download files needed to build debian-installer image
            files = []
            files.extend(env.get("mirror_files"))
            files.extend(checksum_files)

            # Build the environment for running reprepro
            wget_env = {}
            for name, val, changed in self.env.export_iter():
                wget_env[name] = str(val)

            _wget_many([urljoin(baseurl, x) for x in files])

            if checksum_files:
                # Get the release file and verify that it is valid
                release_file = os.path.join(env.get("simple_cdd_temp"), env.format("{DI_CODENAME}_Release"))
                download_release_file = os.path.join(env.get("wget_debian_mirror"), "dists", env.get("DI_CODENAME"), "Release")
                _wget_one(download_release_file, release_file)
                _wget_one(download_release_file + ".gpg", release_file + ".gpg")
                self.gnupg.verify_detached_sig(release_file, release_file + ".gpg")

                # Parse the release file for checksums
                sums = Checksums()
                sums.parse_release_file(release_file)

                # Ensure that the checksum files are those referenced in the Release file
                # And build a list of additional files to download, matching
                # di_match_files in the checksum files contents
                di_match = re.compile(env.get("di_match_files"))
                for file in checksum_files:
                    if file.endswith("SHA256SUMS"):
                        hashtype = "SHA256"
                    elif file.endswith("MD5SUMS"):
                        hashtype = "MD5Sum"
                    else:
                        log.warn("Unknown hash type for %s, skipping file", file)
                        continue

                    prefix_len = 7 + len(env.get("DI_CODENAME")) # dists/{DI_CODENAME}/
                    relname = file[prefix_len:]
                    absname = os.path.join(env.get("MIRROR"), file)
                    # Validate the file
                    sums.verify_file(absname, relname)

                    # Get the list of extra files to download: those whose
                    # pathname matches di_match
                    dirname = os.path.dirname(file)
                    extra_files = []
                    with open(absname, "rt") as fd:
                        for line in fd:
                            hashsum, relname = line.split()
                            if not di_match.search(relname): continue
                            if relname.startswith("./"): relname = relname[2:]
                            extra_files.append({
                                "absname": os.path.join(env.get("MIRROR"), dirname, relname),
                                "relname": relname,
                                "url": os.path.join(env.get("wget_debian_mirror"), dirname, relname),
                            })

                    # Download the extra files
                    _wget_many([x["url"] for x in extra_files])

                    # Check downloaded files against their corresponding checksums.
                    file_sums = Checksums()
                    file_sums.parse_checksums_file(absname, hashtype)
                    for f in extra_files:
                        file_sums.verify_file(f["absname"], f["relname"])