This file is indexed.

/usr/lib/python3/dist-packages/pytest_benchmark/utils.py is in python3-pytest-benchmark 3.0.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from __future__ import division
from __future__ import print_function

import argparse
import json
import os
import platform
import re
import subprocess
import sys
import types
from datetime import datetime
from decimal import Decimal

from .compat import PY3

try:
    from subprocess import check_output
except ImportError:
    def check_output(*popenargs, **kwargs):
        if 'stdout' in kwargs:
            raise ValueError('stdout argument not allowed, it will be overridden.')
        process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
        output, unused_err = process.communicate()
        retcode = process.poll()
        if retcode:
            cmd = kwargs.get("args")
            if cmd is None:
                cmd = popenargs[0]
            raise subprocess.CalledProcessError(retcode, cmd)
        return output


class SecondsDecimal(Decimal):
    def __float__(self):
        return float(super(SecondsDecimal, self).__str__())

    def __str__(self):
        return "{0}s".format(format_time(float(super(SecondsDecimal, self).__str__())))

    @property
    def as_string(self):
        return super(SecondsDecimal, self).__str__()


class NameWrapper(object):
    def __init__(self, target):
        self.target = target

    def __str__(self):
        name = self.target.__module__ + "." if hasattr(self.target, '__module__') else ""
        name += self.target.__name__ if hasattr(self.target, '__name__') else repr(self.target)
        return name

    def __repr__(self):
        return "NameWrapper(%s)" % repr(self.target)


def get_tag():
    info = get_commit_info()
    return '%s_%s%s' % (info['id'], get_current_time(), '_uncommitted-changes' if info['dirty'] else '')


def get_commit_info():
    dirty = False
    commit = 'unversioned'
    try:
        if os.path.exists('.git'):
            desc = check_output('git describe --dirty --always --long --abbrev=40'.split(),
                                universal_newlines=True).strip()
            desc = desc.split('-')
            if desc[-1].strip() == 'dirty':
                dirty = True
                desc.pop()
            commit = desc[-1].strip('g')
        elif os.path.exists('.hg'):
            desc = check_output('hg id --id --debug'.split(), universal_newlines=True).strip()
            if desc[-1] == '+':
                dirty = True
            commit = desc.strip('+')
        return {
            'id': commit,
            'dirty': dirty
        }
    except Exception as exc:
        return {
            'id': 'unknown',
            'dirty': dirty,
            'error': repr(exc),
        }


def get_current_time():
    return datetime.now().strftime("%Y%m%d_%H%M%S")


def first_or_value(obj, value):
    if obj:
        value, = obj

    return value


def load_timer(string):
    if "." not in string:
        raise argparse.ArgumentTypeError("Value for --benchmark-timer must be in dotted form. Eg: 'module.attr'.")
    mod, attr = string.rsplit(".", 1)
    if mod == 'pep418':
        if PY3:
            import time
            return NameWrapper(getattr(time, attr))
        else:
            from . import pep418
            return NameWrapper(getattr(pep418, attr))
    else:
        __import__(mod)
        mod = sys.modules[mod]
        return NameWrapper(getattr(mod, attr))


class RegressionCheck(object):
    def __init__(self, field, threshold):
        self.field = field
        self.threshold = threshold

    def fails(self, current, compared):
        val = self.compute(current, compared)
        if val > self.threshold:
            return "Field %s has failed %s: %.9f > %.9f" % (
                self.field, self.__class__.__name__, val, self.threshold
            )


class PercentageRegressionCheck(RegressionCheck):
    def compute(self, current, compared):
        val = compared[self.field]
        if not val:
            return float("inf")
        return current[self.field] / val * 100 - 100


class DifferenceRegressionCheck(RegressionCheck):
    def compute(self, current, compared):
        return current[self.field] - compared[self.field]


def parse_compare_fail(string,
                       rex=re.compile('^(?P<field>min|max|mean|median|stddev|iqr):'
                                      '((?P<percentage>[0-9]?[0-9])%|(?P<difference>[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?))$')):
    m = rex.match(string)
    if m:
        g = m.groupdict()
        if g['percentage']:
            return PercentageRegressionCheck(g['field'], int(g['percentage']))
        elif g['difference']:
            return DifferenceRegressionCheck(g['field'], float(g['difference']))

    raise argparse.ArgumentTypeError("Could not parse value: %r." % string)


def parse_warmup(string):
    string = string.lower().strip()
    if string == "auto":
        return platform.python_implementation() == "PyPy"
    elif string in ["off", "false", "no"]:
        return False
    elif string in ["on", "true", "yes", ""]:
        return True
    else:
        raise argparse.ArgumentTypeError("Could not parse value: %r." % string)


def parse_timer(string):
    return str(load_timer(string))


def parse_sort(string):
    string = string.lower().strip()
    if string not in ("min", "max", "mean", "stddev"):
        raise argparse.ArgumentTypeError(
            "Unacceptable value: %r. "
            "Value for --benchmark-sort must be one of: 'min', 'max', 'mean' or 'stddev'." % string)
    return string


def parse_rounds(string):
    try:
        value = int(string)
    except ValueError as exc:
        raise argparse.ArgumentTypeError(exc)
    else:
        if value < 1:
            raise argparse.ArgumentTypeError("Value for --benchmark-rounds must be at least 1.")
        return value


def parse_seconds(string):
    try:
        return SecondsDecimal(string).as_string
    except Exception as exc:
        raise argparse.ArgumentTypeError("Invalid decimal value %r: %r" % (string, exc))


def parse_save(string):
    if not string:
        raise argparse.ArgumentTypeError("Can't be empty.")
    illegal = ''.join(c for c in r"\/:*?<>|" if c in string)
    if illegal:
        raise argparse.ArgumentTypeError("Must not contain any of these characters: /:*?<>|\\ (it has %r)" % illegal)
    return string


def time_unit(value):
    if value < 1e-6:
        return "n", 1e9
    elif value < 1e-3:
        return "u", 1e6
    elif value < 1:
        return "m", 1e3
    else:
        return "", 1.


def format_time(value):
    unit, adjustment = time_unit(value)
    return "{0:.2f}{1:s}".format(value * adjustment, unit)


class cached_property(object):
    def __init__(self, func):
        self.__doc__ = getattr(func, '__doc__')
        self.func = func

    def __get__(self, obj, cls):
        if obj is None:
            return self
        value = obj.__dict__[self.func.__name__] = self.func(obj)
        return value


def clonefunc(f):
    """Deep clone the given function to create a new one.

    By default, the PyPy JIT specializes the assembler based on f.__code__:
    clonefunc makes sure that you will get a new function with a **different**
    __code__, so that PyPy will produce independent assembler. This is useful
    e.g. for benchmarks and microbenchmarks, so you can make sure to compare
    apples to apples.

    Use it with caution: if abused, this might easily produce an explosion of
    produced assembler.

    from: https://bitbucket.org/antocuni/pypytools/src/tip/pypytools/util.py?at=default
    """

    # first of all, we clone the code object
    try:
        co = f.__code__
        if PY3:
            co2 = types.CodeType(co.co_argcount, co.co_kwonlyargcount,
                                 co.co_nlocals, co.co_stacksize, co.co_flags, co.co_code,
                                 co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name,
                                 co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars)
        else:
            co2 = types.CodeType(co.co_argcount, co.co_nlocals, co.co_stacksize, co.co_flags, co.co_code,
                                 co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name,
                                 co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars)
        #
        # then, we clone the function itself, using the new co2
        return types.FunctionType(co2, f.__globals__, f.__name__, f.__defaults__, f.__closure__)
    except AttributeError:
        return f


def format_dict(obj):
    return "{%s}" % ", ".join("%s: %s" % (k, json.dumps(v)) for k, v in sorted(obj.items()))


def report_progress(iterable, terminal_reporter, format_string, **kwargs):
    total = len(iterable)

    def progress_reporting_wrapper():
        for pos, item in enumerate(iterable):
            string = format_string.format(pos=pos + 1, total=total, value=item, **kwargs)
            terminal_reporter.rewrite(string, black=True, bold=True)
            yield string, item
    return progress_reporting_wrapper()