This file is indexed.

/usr/lib/python3/dist-packages/pex/util.py is in python3-pex 1.1.4-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import absolute_import

import contextlib
import errno
import os
import shutil
import tempfile
import uuid
from hashlib import sha1
from threading import Lock

from pkg_resources import find_distributions, resource_isdir, resource_listdir, resource_string

from .common import safe_mkdir, safe_mkdtemp, safe_open, safe_rmtree
from .finders import register_finders


class DistributionHelper(object):
  @classmethod
  def walk_data(cls, dist, path='/'):
    """Yields filename, stream for files identified as data in the distribution"""
    for rel_fn in filter(None, dist.resource_listdir(path)):
      full_fn = os.path.join(path, rel_fn)
      if dist.resource_isdir(full_fn):
        for fn, stream in cls.walk_data(dist, full_fn):
          yield fn, stream
      else:
        yield full_fn[1:], dist.get_resource_stream(dist._provider, full_fn)

  @staticmethod
  def zipsafe(dist):
    """Returns whether or not we determine a distribution is zip-safe."""
    # zip-safety is only an attribute of eggs.  wheels are considered never
    # zip safe per implications of PEP 427.
    if hasattr(dist, 'egg_info') and dist.egg_info.endswith('EGG-INFO'):
      egg_metadata = dist.metadata_listdir('')
      return 'zip-safe' in egg_metadata and 'native_libs.txt' not in egg_metadata
    else:
      return False

  @classmethod
  def access_zipped_assets(cls, static_module_name, static_path, dir_location=None):
    """
    Create a copy of static resource files as we can't serve them from within the pex file.

    :param static_module_name: Module name containing module to cache in a tempdir
    :type static_module_name: string, for example 'twitter.common.zookeeper' or similar
    :param static_path: Module name, for example 'serverset'
    :param dir_location: create a new temporary directory inside, or None to have one created
    :returns temp_dir: Temporary directory with the zipped assets inside
    :rtype: str
    """

    # asset_path is initially a module name that's the same as the static_path, but will be
    # changed to walk the directory tree
    def walk_zipped_assets(static_module_name, static_path, asset_path, temp_dir):
      for asset in resource_listdir(static_module_name, asset_path):
        asset_target = os.path.normpath(
            os.path.join(os.path.relpath(asset_path, static_path), asset))
        if resource_isdir(static_module_name, os.path.join(asset_path, asset)):
          safe_mkdir(os.path.join(temp_dir, asset_target))
          walk_zipped_assets(static_module_name, static_path, os.path.join(asset_path, asset),
            temp_dir)
        else:
          with open(os.path.join(temp_dir, asset_target), 'wb') as fp:
            path = os.path.join(static_path, asset_target)
            file_data = resource_string(static_module_name, path)
            fp.write(file_data)

    if dir_location is None:
      temp_dir = safe_mkdtemp()
    else:
      temp_dir = dir_location

    walk_zipped_assets(static_module_name, static_path, static_path, temp_dir)

    return temp_dir

  @classmethod
  def distribution_from_path(cls, path, name=None):
    """Return a distribution from a path.

    If name is provided, find the distribution.  If none is found matching the name,
    return None.  If name is not provided and there is unambiguously a single
    distribution, return that distribution otherwise None.
    """
    # Monkeypatch pkg_resources finders should it not already be so.
    register_finders()
    if name is None:
      distributions = list(find_distributions(path))
      if len(distributions) == 1:
        return distributions[0]
    else:
      for dist in find_distributions(path):
        if dist.project_name == name:
          return dist


class CacheHelper(object):
  @classmethod
  def update_hash(cls, filelike, digest):
    """Update the digest of a single file in a memory-efficient manner."""
    block_size = digest.block_size * 1024
    for chunk in iter(lambda: filelike.read(block_size), b''):
      digest.update(chunk)

  @classmethod
  def hash(cls, path, digest=None, hasher=sha1):
    """Return the digest of a single file in a memory-efficient manner."""
    if digest is None:
      digest = hasher()
    with open(path, 'rb') as fh:
      cls.update_hash(fh, digest)
    return digest.hexdigest()

  @classmethod
  def _compute_hash(cls, names, stream_factory):
    digest = sha1()
    # Always use / as the path separator, since that's what zip uses.
    hashed_names = [n.replace(os.sep, '/') for n in names]
    digest.update(''.join(hashed_names).encode('utf-8'))
    for name in names:
      with contextlib.closing(stream_factory(name)) as fp:
        cls.update_hash(fp, digest)
    return digest.hexdigest()

  @classmethod
  def zip_hash(cls, zf, prefix=''):
    """Return the hash of the contents of a zipfile, comparable with a cls.dir_hash."""
    prefix_length = len(prefix)
    names = sorted(name[prefix_length:] for name in zf.namelist()
        if name.startswith(prefix) and not name.endswith('.pyc') and not name.endswith('/'))
    def stream_factory(name):
      return zf.open(prefix + name)
    return cls._compute_hash(names, stream_factory)

  @classmethod
  def _iter_files(cls, directory):
    normpath = os.path.realpath(os.path.normpath(directory))
    for root, _, files in os.walk(normpath):
      for f in files:
        yield os.path.relpath(os.path.join(root, f), normpath)

  @classmethod
  def pex_hash(cls, d):
    """Return a reproducible hash of the contents of a directory."""
    names = sorted(f for f in cls._iter_files(d) if not (f.endswith('.pyc') or f.startswith('.')))
    def stream_factory(name):
      return open(os.path.join(d, name), 'rb')  # noqa: T802
    return cls._compute_hash(names, stream_factory)

  @classmethod
  def dir_hash(cls, d):
    """Return a reproducible hash of the contents of a directory."""
    names = sorted(f for f in cls._iter_files(d) if not f.endswith('.pyc'))
    def stream_factory(name):
      return open(os.path.join(d, name), 'rb')  # noqa: T802
    return cls._compute_hash(names, stream_factory)

  @classmethod
  def cache_distribution(cls, zf, source, target_dir):
    """Possibly cache an egg from within a zipfile into target_cache.

       Given a zipfile handle and a filename corresponding to an egg distribution within
       that zip, maybe write to the target cache and return a Distribution."""
    dependency_basename = os.path.basename(source)
    if not os.path.exists(target_dir):
      target_dir_tmp = target_dir + '.' + uuid.uuid4().hex
      for name in zf.namelist():
        if name.startswith(source) and not name.endswith('/'):
          # strip off prefix + '/'
          target_name = os.path.join(dependency_basename, name[len(source) + 1:])
          with contextlib.closing(zf.open(name)) as zi:
            with safe_open(os.path.join(target_dir_tmp, target_name), 'wb') as fp:
              shutil.copyfileobj(zi, fp)
      try:
        os.rename(target_dir_tmp, target_dir)
      except OSError as e:
        if e.errno == errno.ENOTEMPTY:
          safe_rmtree(target_dir_tmp)
        else:
          raise

    dist = DistributionHelper.distribution_from_path(target_dir)
    assert dist is not None, 'Failed to cache distribution %s' % source
    return dist


class Memoizer(object):
  """A thread safe class for memoizing the results of a computation."""

  def __init__(self):
    self._data = {}
    self._lock = Lock()

  def get(self, key, default=None):
    with self._lock:
      return self._data.get(key, default)

  def store(self, key, value):
    with self._lock:
      self._data[key] = value


@contextlib.contextmanager
def named_temporary_file(*args, **kwargs):
  """
  Due to a bug in python (https://bugs.python.org/issue14243), we need
  this to be able to use the temporary file without deleting it.
  """
  assert 'delete' not in kwargs
  kwargs['delete'] = False
  fp = tempfile.NamedTemporaryFile(*args, **kwargs)
  try:
    with fp:
      yield fp
  finally:
    os.remove(fp.name)