This file is indexed.

/usr/lib/python3/dist-packages/pex/testing.py is in python3-pex 1.1.14-2ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import contextlib
import os
import random
import sys
import tempfile
import zipfile
from collections import namedtuple
from textwrap import dedent

from .bin.pex import log, main
from .common import safe_mkdir, safe_rmtree
from .compatibility import nested
from .executor import Executor
from .installer import EggInstaller, Packager
from .pex_builder import PEXBuilder
from .util import DistributionHelper, named_temporary_file


@contextlib.contextmanager
def temporary_dir():
  td = tempfile.mkdtemp()
  try:
    yield td
  finally:
    safe_rmtree(td)


@contextlib.contextmanager
def temporary_filename():
  """Creates a temporary filename.

  This is useful when you need to pass a filename to an API. Windows requires all
  handles to a file be closed before deleting/renaming it, so this makes it a bit
  simpler."""
  with named_temporary_file() as fp:
    fp.write(b'')
    fp.close()
    yield fp.name


def random_bytes(length):
  return ''.join(
      map(chr, (random.randint(ord('a'), ord('z')) for _ in range(length)))).encode('utf-8')


@contextlib.contextmanager
def temporary_content(content_map, interp=None, seed=31337):
  """Write content to disk where content is map from string => (int, string).

     If target is int, write int random bytes.  Otherwise write contents of string."""
  random.seed(seed)
  interp = interp or {}
  with temporary_dir() as td:
    for filename, size_or_content in content_map.items():
      safe_mkdir(os.path.dirname(os.path.join(td, filename)))
      with open(os.path.join(td, filename), 'wb') as fp:
        if isinstance(size_or_content, int):
          fp.write(random_bytes(size_or_content))
        else:
          fp.write((size_or_content % interp).encode('utf-8'))
    yield td


def yield_files(directory):
  for root, _, files in os.walk(directory):
    for f in files:
      filename = os.path.join(root, f)
      rel_filename = os.path.relpath(filename, directory)
      yield filename, rel_filename


def write_zipfile(directory, dest, reverse=False):
  with contextlib.closing(zipfile.ZipFile(dest, 'w')) as zf:
    for filename, rel_filename in sorted(yield_files(directory), reverse=reverse):
      zf.write(filename, arcname=rel_filename)
  return dest


PROJECT_CONTENT = {
  'setup.py': dedent('''
      from setuptools import setup

      setup(
          name=%(project_name)r,
          version='0.0.0',
          zip_safe=%(zip_safe)r,
          packages=['my_package'],
          scripts=[
              'scripts/hello_world',
              'scripts/shell_script',
          ],
          package_data={'my_package': ['package_data/*.dat']},
          install_requires=%(install_requires)r,
      )
  '''),
  'scripts/hello_world': '#!/usr/bin/env python\nprint("hello world!")\n',
  'scripts/shell_script': '#!/usr/bin/env bash\necho hello world\n',
  'my_package/__init__.py': 0,
  'my_package/my_module.py': 'def do_something():\n  print("hello world!")\n',
  'my_package/package_data/resource1.dat': 1000,
  'my_package/package_data/resource2.dat': 1000,
}


@contextlib.contextmanager
def make_installer(name='my_project', installer_impl=EggInstaller, zip_safe=True,
                   install_reqs=None):
  interp = {'project_name': name, 'zip_safe': zip_safe, 'install_requires': install_reqs or []}
  with temporary_content(PROJECT_CONTENT, interp=interp) as td:
    yield installer_impl(td)


@contextlib.contextmanager
def make_source_dir(name='my_project', install_reqs=None):
  interp = {'project_name': name, 'zip_safe': True, 'install_requires': install_reqs or []}
  with temporary_content(PROJECT_CONTENT, interp=interp) as td:
    yield td


def make_sdist(name='my_project', zip_safe=True, install_reqs=None):
  with make_installer(name=name, installer_impl=Packager, zip_safe=zip_safe,
                      install_reqs=install_reqs) as packager:
    return packager.sdist()


@contextlib.contextmanager
def make_bdist(name='my_project', installer_impl=EggInstaller, zipped=False, zip_safe=True):
  with make_installer(name=name, installer_impl=installer_impl, zip_safe=zip_safe) as installer:
    dist_location = installer.bdist()
    if zipped:
      yield DistributionHelper.distribution_from_path(dist_location)
    else:
      with temporary_dir() as td:
        extract_path = os.path.join(td, os.path.basename(dist_location))
        with contextlib.closing(zipfile.ZipFile(dist_location)) as zf:
          zf.extractall(extract_path)
        yield DistributionHelper.distribution_from_path(extract_path)


COVERAGE_PREAMBLE = """
try:
  from coverage import coverage
  cov = coverage(auto_data=True, data_suffix=True)
  cov.start()
except ImportError:
  pass
"""


def write_simple_pex(td, exe_contents, dists=None, coverage=False):
  """Write a pex file that contains an executable entry point

  :param td: temporary directory path
  :param exe_contents: entry point python file
  :type exe_contents: string
  :param dists: distributions to include, typically sdists or bdists
  :param coverage: include coverage header
  """
  dists = dists or []

  with open(os.path.join(td, 'exe.py'), 'w') as fp:
    fp.write(exe_contents)

  pb = PEXBuilder(path=td, preamble=COVERAGE_PREAMBLE if coverage else None)

  for dist in dists:
    pb.add_egg(dist.location)

  pb.set_executable(os.path.join(td, 'exe.py'))
  pb.freeze()

  return pb


class IntegResults(namedtuple('results', 'output return_code exception')):
  """Convenience object to return integration run results."""

  def assert_success(self):
    assert self.exception is None and self.return_code is None

  def assert_failure(self):
    assert self.exception or self.return_code


def run_pex_command(args, env=None):
  """Simulate running pex command for integration testing.

  This is different from run_simple_pex in that it calls the pex command rather
  than running a generated pex.  This is useful for testing end to end runs
  with specific command line arguments or env options.
  """
  def logger_callback(_output):
    def mock_logger(msg, v=None):
      _output.append(msg)

    return mock_logger

  exception = None
  error_code = None
  output = []
  log.set_logger(logger_callback(output))
  try:
    main(args=args)
  except SystemExit as e:
    error_code = e.code
  except Exception as e:
    exception = e
  return IntegResults(output, error_code, exception)


# TODO(wickman) Why not PEX.run?
def run_simple_pex(pex, args=(), env=None, stdin=None):
  process = Executor.open_process([sys.executable, pex] + list(args), env=env, combined=True)
  stdout, _ = process.communicate(input=stdin)
  return stdout.replace(b'\r', b''), process.returncode


def run_simple_pex_test(body, args=(), env=None, dists=None, coverage=False):
  with nested(temporary_dir(), temporary_dir()) as (td1, td2):
    pb = write_simple_pex(td1, body, dists=dists, coverage=coverage)
    pex = os.path.join(td2, 'app.pex')
    pb.build(pex)
    return run_simple_pex(pex, args=args, env=env)


def _iter_filter(data_dict):
  fragment = '/%s/_pex/' % PEXBuilder.BOOTSTRAP_DIR
  for filename, records in data_dict.items():
    try:
      bi = filename.index(fragment)
    except ValueError:
      continue
    # rewrite to look like root source
    yield ('pex/' + filename[bi + len():], records)


def combine_pex_coverage(coverage_file_iter):
  from coverage.data import CoverageData

  combined = CoverageData(basename='.coverage_combined')

  for filename in coverage_file_iter:
    cov = CoverageData(basename=filename)
    cov.read()
    combined.add_line_data(dict(_iter_filter(cov.line_data())))
    combined.add_arc_data(dict(_iter_filter(cov.arc_data())))

  combined.write()
  return combined.filename