This file is indexed.

/usr/lib/python3/dist-packages/DistUpgrade/DistUpgradeView.py is in python3-distupgrade 1:16.04.12.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# DistUpgradeView.py 
#  
#  Copyright (c) 2004,2005 Canonical
#  
#  Author: Michael Vogt <michael.vogt@ubuntu.com>
# 
#  This program is free software; you can redistribute it and/or 
#  modify it under the terms of the GNU General Public License as 
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA

from .DistUpgradeGettext import gettext as _
from .DistUpgradeGettext import ngettext
import apt
import errno
import os
import apt_pkg 
import locale
import logging
import signal
import select

from .DistUpgradeAufs import doAufsChroot, doAufsChrootRsync
from .DistUpgradeApport import apport_pkgfailure


try:
    locale.setlocale(locale.LC_ALL, "")
    (code, ENCODING) = locale.getdefaultlocale()
except:
    logging.exception("getting the encoding failed")
    ENCODING = "utf-8"   #pyflakes

# if there is no encoding, setup UTF-8
if not ENCODING:
    ENCODING = "utf-8"
    os.putenv("LC_CTYPE", "C.UTF-8")
    try:
        locale.setlocale(locale.LC_CTYPE, "C.UTF-8")
    except locale.error:
        pass


# log locale information
logging.info("locale: '%s' '%s'" % locale.getlocale())


def FuzzyTimeToStr(sec):
  " return the time a bit fuzzy (no seconds if time > 60 secs "
  #print("FuzzyTimeToStr: ", sec)
  sec = int(sec)

  days = sec//(60*60*24)
  hours = sec//(60*60) % 24
  minutes = (sec//60) % 60
  seconds = sec % 60
  # 0 seonds remaining looks wrong and its "fuzzy" anyway
  if seconds == 0:
    seconds = 1

  # string map to make the re-ordering possible
  map = { "str_days" : "",
          "str_hours" : "",
          "str_minutes" : "",
          "str_seconds" : ""
        }
  
  # get the fragments, this is not ideal i18n wise, but its
  # difficult to do it differently
  if days > 0:
    map["str_days"] = ngettext("%li day","%li days", days) % days
  if hours > 0:
    map["str_hours"] = ngettext("%li hour","%li hours", hours) % hours
  if minutes > 0:
    map["str_minutes"] = ngettext("%li minute","%li minutes", minutes) % minutes
  map["str_seconds"] = ngettext("%li second","%li seconds", seconds) % seconds

  # now assemble the string
  if days > 0:
    # Don't print str_hours if it's an empty string, see LP: #288912
    if map["str_hours"] == '':
        return map["str_days"]
    # TRANSLATORS: you can alter the ordering of the remaining time
    # information here if you shuffle %(str_days)s %(str_hours)s %(str_minutes)s
    # around. Make sure to keep all '$(str_*)s' in the translated string
    # and do NOT change anything appart from the ordering.
    #
    # %(str_hours)s will be either "1 hour" or "2 hours" depending on the
    # plural form
    # 
    # Note: most western languages will not need to change this
    return _("%(str_days)s %(str_hours)s") % map
  # display no minutes for time > 3h, see LP: #144455
  elif hours > 3:
    return map["str_hours"]
  # when we are near the end, become more precise again
  elif hours > 0:
    # Don't print str_minutes if it's an empty string, see LP: #288912
    if map["str_minutes"] == '':
        return map["str_hours"]
    # TRANSLATORS: you can alter the ordering of the remaining time
    # information here if you shuffle %(str_hours)s %(str_minutes)s
    # around. Make sure to keep all '$(str_*)s' in the translated string
    # and do NOT change anything appart from the ordering.
    #
    # %(str_hours)s will be either "1 hour" or "2 hours" depending on the
    # plural form
    # 
    # Note: most western languages will not need to change this
    return _("%(str_hours)s %(str_minutes)s") % map
  elif minutes > 0:
    return map["str_minutes"]
  return map["str_seconds"]


class AcquireProgress(apt.progress.base.AcquireProgress):

  def __init__(self):
    super(AcquireProgress, self).__init__()
    self.est_speed = 0.0
  def start(self):
    super(AcquireProgress, self).start()
    self.est_speed = 0.0
    self.eta = 0.0
    self.percent = 0.0
    self.release_file_download_error = False
  def update_status(self, uri, descr, shortDescr, status):
    super(AcquireProgress, self).update_status(uri, descr, shortDescr, status)
    # FIXME: workaround issue in libapt/python-apt that does not 
    #        raise a exception if *all* files fails to download
    if status == apt_pkg.STAT_FAILED:
      logging.warning("update_status: dlFailed on '%s' " % uri)
      if uri.endswith("Release.gpg") or uri.endswith("Release"):
        # only care about failures from network, not gpg, bzip, those
        # are different issues
        for net in ["http","ftp","mirror"]:
          if uri.startswith(net):
            self.release_file_download_error = True
            break
  # required, otherwise the lucid version of python-apt gets really
  # unhappy, its expecting this function for apt.progress.base.AcquireProgress
  def pulse_items(self, arg):
    return True
  def pulse(self, owner=None):
    super(AcquireProgress, self).pulse(owner)
    self.percent = (((self.current_bytes + self.current_items) * 100.0) /
                    float(self.total_bytes + self.total_items))
    if self.current_cps > self.est_speed:
      self.est_speed = (self.est_speed+self.current_cps)/2.0
    if self.current_cps > 0:
      self.eta = ((self.total_bytes - self.current_bytes) /
                  float(self.current_cps))
    return True
  def isDownloadSpeedEstimated(self):
    return (self.est_speed != 0)
  def estimatedDownloadTime(self, required_download):
    """ get the estimated download time """
    if self.est_speed == 0:
      timeModem = required_download/(56*1024/8)  # 56 kbit 
      timeDSL = required_download/(1024*1024/8)  # 1Mbit = 1024 kbit
      s= _("This download will take about %s with a 1Mbit DSL connection "
           "and about %s with a 56k modem.") % (FuzzyTimeToStr(timeDSL), FuzzyTimeToStr(timeModem))
      return s
    # if we have a estimated speed, use it
    s = _("This download will take about %s with your connection. ") % FuzzyTimeToStr(required_download/self.est_speed)
    return s
    


class InstallProgress(apt.progress.base.InstallProgress):
  """ Base class for InstallProgress that supports some fancy
      stuff like apport integration
  """
  def __init__(self):
    apt.progress.base.InstallProgress.__init__(self)
    self.master_fd = None

  def wait_child(self):
      """Wait for child progress to exit.

      The return values is the full status returned from os.waitpid()
      (not only the return code).
      """
      while True:
          try:
              select.select([self.statusfd], [], [], self.select_timeout)
          except select.error as e:
              if e.args[0] != errno.EINTR:
                  raise
          self.update_interface()
          try:
              (pid, res) = os.waitpid(self.child_pid, os.WNOHANG)
              if pid == self.child_pid:
                  break
          except OSError as e:
              if e.errno != errno.EINTR:
                  raise
              if e.errno == errno.ECHILD:
                  break
      return res

  def run(self, pm):
    pid = self.fork()
    if pid == 0:
      # check if we need to setup/enable the aufs chroot stuff
      if "RELEASE_UPGRADE_USE_AUFS_CHROOT" in os.environ:
        if not doAufsChroot(os.environ["RELEASE_UPGRADE_AUFS_RWDIR"],
                            os.environ["RELEASE_UPGRADE_USE_AUFS_CHROOT"]):
          print("ERROR: failed to setup aufs chroot overlay")
          os._exit(1)
      # child, ignore sigpipe, there are broken scripts out there
      # like etckeeper (LP: #283642)
      signal.signal(signal.SIGPIPE,signal.SIG_IGN) 
      try:
        res = pm.do_install(self.writefd)
      except Exception as e:
        print("Exception during pm.DoInstall(): ", e)
        logging.exception("Exception during pm.DoInstall()")
        with open("/var/run/ubuntu-release-upgrader-apt-exception","w") as f:
            f.write(str(e))
        os._exit(pm.RESULT_FAILED)
      os._exit(res)
    self.child_pid = pid
    res = os.WEXITSTATUS(self.wait_child())
    # check if we want to sync the changes back, *only* do that
    # if res is positive
    if (res == 0 and
        "RELEASE_UPGRADE_RSYNC_AUFS_CHROOT" in os.environ):
      logging.info("doing rsync commit of the update")
      if not doAufsChrootRsync(os.environ["RELEASE_UPGRADE_USE_AUFS_CHROOT"]):
        logging.error("FATAL ERROR: doAufsChrootRsync() returned FALSE")
        return pm.RESULT_FAILED
    return res
  
  def error(self, pkg, errormsg):
    " install error from a package "
    apt.progress.base.InstallProgress.error(self, pkg, errormsg)
    logging.error("got an error from dpkg for pkg: '%s': '%s'" % (pkg, errormsg))
    if "/" in pkg:
      pkg = os.path.basename(pkg)
    if "_" in pkg:
      pkg = pkg.split("_")[0]
    # now run apport
    apport_pkgfailure(pkg, errormsg)

class DumbTerminal(object):
    def call(self, cmd, hidden=False):
        " expects a command in the subprocess style (as a list) "
        import subprocess
        subprocess.call(cmd)

class DummyHtmlView(object):
    def open(self, url):
        pass
    def show(self):
      pass
    def hide(self):
      pass

(STEP_PREPARE,
 STEP_MODIFY_SOURCES,
 STEP_FETCH,
 STEP_INSTALL,
 STEP_CLEANUP,
 STEP_REBOOT,
 STEP_N) = range(1,8)

# Declare these translatable strings from the .ui files here so that
# xgettext picks them up.
( _("Preparing to upgrade"),
  _("Getting new software channels"),
  _("Getting new packages"),
  _("Installing the upgrades"),
  _("Cleaning up"),
)

class DistUpgradeView(object):
    " abstraction for the upgrade view "
    def __init__(self):
        self.needs_screen = False
        pass
    def getOpCacheProgress(self):
        " return a OpProgress() subclass for the given graphic"
        return apt.progress.base.OpProgress()
    def getAcquireProgress(self):
        " return an acquire progress object "
        return AcquireProgress()
    def getInstallProgress(self, cache=None):
        " return a install progress object "
        return InstallProgress()
    def getTerminal(self):
        return DumbTerminal()
    def getHtmlView(self):
        return DummyHtmlView()
    def updateStatus(self, msg):
        """ update the current status of the distUpgrade based
            on the current view
        """
        pass
    def abort(self):
        """ provide a visual feedback that the upgrade was aborted """
        pass
    def setStep(self, step):
        """ we have 6 steps current for a upgrade:
        1. Analyzing the system
        2. Updating repository information
        3. fetch packages
        3. Performing the upgrade
        4. Post upgrade stuff
        5. Complete
        """
        pass
    def hideStep(self, step):
        " hide a certain step from the GUI "
        pass
    def showStep(self, step):
        " show a certain step from the GUI "
        pass
    def confirmChanges(self, summary, changes, demotions, downloadSize,
                       actions=None, removal_bold=True):
        """ display the list of changed packages (apt.Package) and
            return if the user confirms them
        """
        self.confirmChangesMessage = ""
        self.demotions = demotions
        self.toInstall = []
        self.toReinstall = []
        self.toUpgrade = []
        self.toRemove = []
        self.toRemoveAuto = []
        self.toDowngrade = []
        for pkg in changes:
            if pkg.marked_install: 
              self.toInstall.append(pkg)
            elif pkg.marked_upgrade: 
              self.toUpgrade.append(pkg)
            elif pkg.marked_reinstall:
              self.toReinstall.append(pkg)
            elif pkg.marked_delete:
              if pkg._pcache._depcache.is_auto_installed(pkg._pkg):
                self.toRemoveAuto.append(pkg)
              else:
                self.toRemove.append(pkg)
            elif pkg.marked_downgrade: 
              self.toDowngrade.append(pkg)
        # do not bother the user with a different treeview
        self.toInstall = self.toInstall + self.toReinstall
        # sort it
        self.toInstall.sort()
        self.toUpgrade.sort()
        self.toRemove.sort()
        self.toRemoveAuto.sort()
        self.toDowngrade.sort()
        # now build the message (the same for all frontends)
        msg = "\n"
        pkgs_remove = len(self.toRemove) + len(self.toRemoveAuto)
        pkgs_inst = len(self.toInstall) + len(self.toReinstall)
        pkgs_upgrade = len(self.toUpgrade)
        # FIXME: show detailed packages
        if len(self.demotions) > 0:
          msg += ngettext(
            "%(amount)d installed package is no longer supported by Canonical. "
            "You can still get support from the community.",
            "%(amount)d installed packages are no longer supported by "
            "Canonical. You can still get support from the community.",
            len(self.demotions)) % { 'amount' : len(self.demotions) }
          msg += "\n\n"
        if pkgs_remove > 0:
          # FIXME: make those two separate lines to make it clear
          #        that the "%" applies to the result of ngettext
          msg += ngettext("%d package is going to be removed.",
                          "%d packages are going to be removed.",
                          pkgs_remove) % pkgs_remove
          msg += " "
        if pkgs_inst > 0:
          msg += ngettext("%d new package is going to be "
                          "installed.",
                          "%d new packages are going to be "
                          "installed.",pkgs_inst) % pkgs_inst
          msg += " "
        if pkgs_upgrade > 0:
          msg += ngettext("%d package is going to be upgraded.",
                          "%d packages are going to be upgraded.",
                          pkgs_upgrade) % pkgs_upgrade
          msg +=" "
        if downloadSize > 0:
          downloadSizeStr = apt_pkg.size_to_str(downloadSize)
          if isinstance(downloadSizeStr, bytes):
              downloadSizeStr = downloadSizeStr.decode(ENCODING)
          msg += _("\n\nYou have to download a total of %s. ") % (
              downloadSizeStr)
          msg += self.getAcquireProgress().estimatedDownloadTime(downloadSize)
        if ((pkgs_upgrade + pkgs_inst) > 0) and ((pkgs_upgrade + pkgs_inst + pkgs_remove) > 100):
          if self.getAcquireProgress().isDownloadSpeedEstimated():
            msg += "\n\n%s" % _( "Installing the upgrade "
                                 "can take several hours. Once the download "
                                 "has finished, the process cannot be canceled.")
          else:
            msg += "\n\n%s" % _( "Fetching and installing the upgrade "
                                 "can take several hours. Once the download "
                                 "has finished, the process cannot be canceled.")
        else:
          if pkgs_remove > 100:
            msg += "\n\n%s" % _( "Removing the packages "
                                 "can take several hours. ")
        # Show an error if no actions are planned
        if (pkgs_upgrade + pkgs_inst + pkgs_remove) < 1:
          # FIXME: this should go into DistUpgradeController
          summary = _("The software on this computer is up to date.")
          msg = _("There are no upgrades available for your system. "
                  "The upgrade will now be canceled.")
          self.error(summary, msg)
          return False
        # set the message
        self.confirmChangesMessage = msg
        return True

    def askYesNoQuestion(self, summary, msg, default='No'):
        " ask a Yes/No question and return True on 'Yes' "
        pass
    def confirmRestart(self):
        " generic ask about the restart, can be overridden "
        summary = _("Reboot required")
        msg =  _("The upgrade is finished and "
                 "a reboot is required. "
                 "Do you want to do this "
                 "now?")
        return self.askYesNoQuestion(summary, msg)
    def error(self, summary, msg, extended_msg=None):
        " display a error "
        pass
    def information(self, summary, msg, extended_msg=None):
        " display a information msg"
        pass
    def processEvents(self):
        """ process gui events (to keep the gui alive during a long
            computation """
        pass
    def pulseProgress(self, finished=False):
      """ do a progress pulse (e.g. bounce a bar back and forth, show
          a spinner)
      """
      pass
    def showDemotions(self, summary, msg, demotions):
      """
      show demoted packages to the user, default implementation
      is to just show a information dialog
      """
      self.information(summary, msg, "\n".join(demotions))

if __name__ == "__main__":
  fp = AcquireProgress()
  fp.pulse()