This file is indexed.

/usr/lib/python3/dist-packages/DistUpgrade/DistUpgradeFetcherCore.py is in python3-distupgrade 1:16.04.12.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# DistUpgradeFetcherCore.py
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
#
#  Copyright (c) 2006 Canonical
#
#  Author: Michael Vogt <michael.vogt@ubuntu.com>
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA

from string import Template
import os
import apt_pkg
import logging
import tarfile
import tempfile
import shutil
import sys
import subprocess
from gettext import gettext as _
from aptsources.sourceslist import SourcesList

from .utils import get_dist, url_downloadable, country_mirror


class DistUpgradeFetcherCore(object):
    " base class (without GUI) for the upgrade fetcher "

    DEFAULT_MIRROR = "http://archive.ubuntu.com/ubuntu"
    DEFAULT_COMPONENT = "main"
    DEBUG = "DEBUG_UPDATE_MANAGER" in os.environ

    def __init__(self, new_dist, progress):
        self.new_dist = new_dist
        self.current_dist_name = get_dist()
        self._progress = progress
        # options to pass to the release upgrader when it is run
        self.run_options = []

    def _debug(self, msg):
        " helper to show debug information "
        if self.DEBUG:
            sys.stderr.write(msg + "\n")

    def showReleaseNotes(self):
        return True

    def error(self, summary, message):
        """ dummy implementation for error display, should be overwriten
            by subclasses that want to more fancy method
        """
        print(summary)
        print(message)
        return False

    def authenticate(self):
        if self.new_dist.upgradeToolSig:
            f = self.tmpdir + "/" + os.path.basename(self.new_dist.upgradeTool)
            sig = self.tmpdir + "/" + os.path.basename(
                self.new_dist.upgradeToolSig)
            print(_("authenticate '%(file)s' against '%(signature)s' ") % {
                'file': os.path.basename(f),
                'signature': os.path.basename(sig)})
            if self.gpgauthenticate(f, sig):
                return True
        return False

    def gpgauthenticate(self, file, signature,
                        keyring='/etc/apt/trusted.gpg'):
        """ authenticated a file against a given signature, if no keyring
            is given use the apt default keyring
        """
        status_pipe = os.pipe()
        logger_pipe = os.pipe()
        if sys.version_info >= (3, 4):
            os.set_inheritable(status_pipe[1], 1)
            os.set_inheritable(logger_pipe[1], 1)
        gpg = [
            "gpg",
            "--status-fd", "%d" % status_pipe[1],
            "--logger-fd", "%d" % logger_pipe[1],
            "--no-options",
            "--homedir", self.tmpdir,
            "--no-default-keyring",
            "--ignore-time-conflict",
            "--keyring", keyring,
            "--verify", signature, file,
        ]

        def gpg_preexec():
            os.close(status_pipe[0])
            os.close(logger_pipe[0])

        proc = subprocess.Popen(
            gpg, stderr=subprocess.PIPE, preexec_fn=gpg_preexec,
            close_fds=False, universal_newlines=True)
        os.close(status_pipe[1])
        os.close(logger_pipe[1])
        status_handle = os.fdopen(status_pipe[0])
        logger_handle = os.fdopen(logger_pipe[0])
        try:
            gpgres = status_handle.read()
            ret = proc.wait()
            if ret != 0:
                # gnupg returned a problem (non-zero exit)
                print("gpg exited %d" % ret)
                print("Debug information: ")
                print(status_handle.read())
                print(proc.stderr.read())
                print(logger_handle.read())
                return False
            if "VALIDSIG" in gpgres:
                return True
            print("invalid result from gpg:")
            print(gpgres)
            return False
        finally:
            status_handle.close()
            proc.stderr.close()
            logger_handle.close()

    def extractDistUpgrader(self):
        # extract the tarball
        fname = os.path.join(self.tmpdir, os.path.basename(self.uri))
        print(_("extracting '%s'") % os.path.basename(fname))
        if not os.path.exists(fname):
            return False
        try:
            tar = tarfile.open(self.tmpdir + "/" +
                               os.path.basename(self.uri), "r")
            for tarinfo in tar:
                tar.extract(tarinfo)
            tar.close()
        except tarfile.ReadError as e:
            logging.error("failed to open tarfile (%s)" % e)
            return False
        return True

    def verifyDistUprader(self):
        # FIXME: check an internal dependency file to make sure
        #        that the script will run correctly

        # see if we have a script file that we can run
        self.script = script = "%s/%s" % (self.tmpdir, self.new_dist.name)
        if not os.path.exists(script):
            return self.error(_("Could not run the upgrade tool"),
                              _("Could not run the upgrade tool") + ".  " +
                              _("This is most likely a bug in the upgrade "
                                "tool. Please report it as a bug using the "
                                "command 'ubuntu-bug "
                                "ubuntu-release-upgrader-core'."))
        return True

    def mirror_from_sources_list(self, uri, default_uri):
        """
        try to figure what the mirror is from current sources.list

        do this by looing for matching DEFAULT_COMPONENT, current dist
        in sources.list and then doing a http HEAD/ftp size request
        to see if the uri is available on this server
        """
        self._debug("mirror_from_sources_list: %s" % self.current_dist_name)
        sources = SourcesList(withMatcher=False)
        seen = set()
        for e in sources.list:
            if e.disabled or e.invalid or not e.type == "deb":
                continue
            # check if we probed this mirror already
            if e.uri in seen:
                continue
            # we are using the main mirror already, so we are fine
            if (e.uri.startswith(default_uri) and
                    e.dist == self.current_dist_name and
                    self.DEFAULT_COMPONENT in e.comps):
                return uri
            elif (e.dist == self.current_dist_name and "main" in e.comps):
                mirror_uri = e.uri + uri[len(default_uri):]
                if url_downloadable(mirror_uri, self._debug):
                    return mirror_uri
                seen.add(e.uri)
        self._debug("no mirror found")
        return ""

    def _expandUri(self, uri):
        """
        expand the uri so that it uses a mirror if the url starts
        with a well known string (like archive.ubuntu.com)
        """
        # try to guess the mirror from the sources.list
        if uri.startswith(self.DEFAULT_MIRROR):
            self._debug("trying to find suitable mirror")
            new_uri = self.mirror_from_sources_list(uri, self.DEFAULT_MIRROR)
            if new_uri:
                return new_uri
        # if that fails, use old method
        uri_template = Template(uri)
        m = country_mirror()
        new_uri = uri_template.safe_substitute(countrymirror=m)
        # be paranoid and check if the given uri is really downloadable
        try:
            if not url_downloadable(new_uri, self._debug):
                raise Exception("failed to download %s" % new_uri)
        except Exception as e:
            self._debug("url '%s' could not be downloaded" % e)
            # else fallback to main server
            new_uri = uri_template.safe_substitute(countrymirror='')
        return new_uri

    def fetchDistUpgrader(self):
        " download the tarball with the upgrade script "
        tmpdir = tempfile.mkdtemp(prefix="ubuntu-release-upgrader-")
        self.tmpdir = tmpdir
        os.chdir(tmpdir)
        logging.debug("using tmpdir: '%s'" % tmpdir)
        # turn debugging on here (if required)
        if self.DEBUG > 0:
            apt_pkg.config.set("Debug::Acquire::http", "1")
            apt_pkg.config.set("Debug::Acquire::ftp", "1")
        #os.listdir(tmpdir)
        fetcher = apt_pkg.Acquire(self._progress)
        if self.new_dist.upgradeToolSig is not None:
            uri = self._expandUri(self.new_dist.upgradeToolSig)
            af1 = apt_pkg.AcquireFile(fetcher,
                                      uri,
                                      descr=_("Upgrade tool signature"))
            # reference it here to shut pyflakes up
            af1
        if self.new_dist.upgradeTool is not None:
            self.uri = self._expandUri(self.new_dist.upgradeTool)
            af2 = apt_pkg.AcquireFile(fetcher,
                                      self.uri,
                                      descr=_("Upgrade tool"))
            # reference it here to shut pyflakes up
            af2
            result = fetcher.run()
            if result != fetcher.RESULT_CONTINUE:
                logging.warning("fetch result != continue (%s)" % result)
                return False
            # check that both files are really there and non-null
            for f in [os.path.basename(self.new_dist.upgradeToolSig),
                      os.path.basename(self.new_dist.upgradeTool)]:
                if not (os.path.exists(f) and os.path.getsize(f) > 0):
                    logging.warning("file '%s' missing" % f)
                    return False
            return True
        return False

    def runDistUpgrader(self):
        args = [self.script] + self.run_options
        if os.getuid() != 0:
            os.execv("/usr/bin/sudo", ["sudo"] + args)
        else:
            os.execv(self.script, args)

    def cleanup(self):
        # cleanup
        os.chdir("..")
        # del tmpdir
        shutil.rmtree(self.tmpdir)

    def run(self):
        # see if we have release notes
        if not self.showReleaseNotes():
            return
        if not self.fetchDistUpgrader():
            self.error(_("Failed to fetch"),
                       _("Fetching the upgrade failed. There may be a network "
                         "problem. "))
            return
        if not self.authenticate():
            self.error(_("Authentication failed"),
                       _("Authenticating the upgrade failed. There may be a "
                         "problem with the network or with the server. "))
            self.cleanup()
            return
        if not self.extractDistUpgrader():
            self.error(_("Failed to extract"),
                       _("Extracting the upgrade failed. There may be a "
                         "problem with the network or with the server. "))

            return
        if not self.verifyDistUprader():
            self.error(_("Verification failed"),
                       _("Verifying the upgrade failed.  There may be a "
                         "problem with the network or with the server. "))
            self.cleanup()
            return
        try:
            # check if we can execute, if we run it via sudo we will
            # not know otherwise, pkexec will not raise a exception
            if not os.access(self.script, os.X_OK):
                ex = OSError("Can not execute '%s'" % self.script)
                ex.errno = 13
                raise ex
            self.runDistUpgrader()
        except OSError as e:
            if e.errno == 13:
                self.error(_("Can not run the upgrade"),
                           _("This usually is caused by a system where /tmp "
                             "is mounted noexec. Please remount without "
                             "noexec and run the upgrade again."))
                return False
            else:
                self.error(_("Can not run the upgrade"),
                           _("The error message is '%s'.") % e.strerror)
        return True

if __name__ == "__main__":
    d = DistUpgradeFetcherCore(None, None)
#    print(d.authenticate('/tmp/Release','/tmp/Release.gpg'))
    print("got mirror: '%s'" %
          d.mirror_from_sources_list(
              "http://archive.ubuntu.com/ubuntu/dists/intrepid-proposed/main/"
              "dist-upgrader-all/0.93.34/intrepid.tar.gz",
              "http://archive.ubuntu.com/ubuntu"))