This file is indexed.

/usr/lib/python3/dist-packages/stestr/scheduler.py is in python3-stestr 1.1.0-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import collections
import itertools
import multiprocessing
import operator
import random

import yaml

from stestr import selection


def partition_tests(test_ids, concurrency, repository, group_callback,
                    randomize=False):
        """Partition test_ids by concurrency.

        Test durations from the repository are used to get partitions which
        have roughly the same expected runtime. New tests - those with no
        recorded duration - are allocated in round-robin fashion to the
        partitions created using test durations.

        :param list test_ids: The list of test_ids to be partitioned
        :param int concurrency: The concurrency that will be used for running
            the tests. This is the number of partitions that test_ids will be
            split into.
        :param repository: A repository object that
        :param group_callback: A callback function that is used as a scheduler
            hint to group test_ids together and treat them as a single unit for
            scheduling. This function expects a single test_id parameter and it
            will return a group identifier. Tests_ids that have the same group
            identifier will be kept on the same worker.
        :param bool randomize: If true each partition's test order will be
                            randomized

        :return: A list where each element is a distinct subset of test_ids,
            and the union of all the elements is equal to set(test_ids).
        """
        _group_callback = group_callback
        partitions = [list() for i in range(concurrency)]
        timed_partitions = [[0.0, partition] for partition in partitions]
        time_data = {}
        if repository:
            time_data = repository.get_test_times(test_ids)
            timed_tests = time_data['known']
            unknown_tests = time_data['unknown']
        else:
            timed_tests = {}
            unknown_tests = set(test_ids)
        # Group tests: generate group_id -> test_ids.
        group_ids = collections.defaultdict(list)
        if _group_callback is None:
            group_callback = lambda _: None
        else:
            group_callback = _group_callback
        for test_id in test_ids:
            group_id = group_callback(test_id) or test_id
            group_ids[group_id].append(test_id)
        # Time groups: generate three sets of groups:
        # - fully timed dict(group_id -> time),
        # - partially timed dict(group_id -> time) and
        # - unknown (set of group_id)
        # We may in future treat partially timed different for scheduling, but
        # at least today we just schedule them after the fully timed groups.
        timed = {}
        partial = {}
        unknown = []
        for group_id, group_tests in group_ids.items():
            untimed_ids = unknown_tests.intersection(group_tests)
            group_time = sum(
                [timed_tests[test_id]
                    for test_id in untimed_ids.symmetric_difference(
                        group_tests)])
            if not untimed_ids:
                timed[group_id] = group_time
            elif group_time:
                partial[group_id] = group_time
            else:
                unknown.append(group_id)

        # Scheduling is NP complete in general, so we avoid aiming for
        # perfection. A quick approximation that is sufficient for our general
        # needs:
        # sort the groups by time
        # allocate to partitions by putting each group in to the partition with
        # the current (lowest time, shortest length[in tests])
        def consume_queue(groups):
            queue = sorted(
                groups.items(), key=operator.itemgetter(1), reverse=True)
            for group_id, duration in queue:
                timed_partitions[0][0] = timed_partitions[0][0] + duration
                timed_partitions[0][1].extend(group_ids[group_id])
                timed_partitions.sort(key=lambda item: (item[0], len(item[1])))

        consume_queue(timed)
        consume_queue(partial)
        # Assign groups with entirely unknown times in round robin fashion to
        # the partitions.
        for partition, group_id in zip(itertools.cycle(partitions), unknown):
            partition.extend(group_ids[group_id])
        if randomize:
            out_parts = []
            for partition in partitions:
                temp_part = list(partition)
                random.shuffle(temp_part)
                out_parts.append(list(temp_part))
            return out_parts
        else:
            return partitions


def local_concurrency():
    """Get the number of available CPUs on the system.

    :return: An int for the number of cpus. Or None if it couldn't be found
    """

    try:
        return multiprocessing.cpu_count()
    except NotImplementedError:
        # No concurrency logic known.
        return None


def generate_worker_partitions(ids, worker_path, repository=None,
                               group_callback=None, randomize=False):
    """Parse a worker yaml file and generate test groups

    :param list ids: A list of test ids too be partitioned
    :param path worker_path: The path to a worker file
    :param repository: A repository object that will be used for looking up
        timing data. This is optional, and also will only be used for
        scheduling if there is a count field on a worker.
    :param group_callback: A callback function that is used as a scheduler
        hint to group test_ids together and treat them as a single unit for
        scheduling. This function expects a single test_id parameter and it
        will return a group identifier. Tests_ids that have the same group
        identifier will be kept on the same worker. This is optional and
        also will only be used for scheduling if there is a count field on a
        worker.
    :param bool randomize: If true each partition's test order will be
        randomized. This is optional and also will only be used for scheduling
        if there is a count field on a worker.

    :returns: A list where each element is a distinct subset of test_ids.
    """
    with open(worker_path, 'r') as worker_file:
        workers_desc = yaml.load(worker_file.read())
    worker_groups = []
    for worker in workers_desc:
        if isinstance(worker, dict) and 'worker' in worker.keys():
            if isinstance(worker['worker'], list):
                local_worker_list = selection.filter_tests(
                    worker['worker'], ids)

                if 'concurrency' in worker.keys() and worker[
                    'concurrency'] > 1:
                    partitioned_tests = partition_tests(
                        local_worker_list, worker['concurrency'], repository,
                        group_callback, randomize)
                    worker_groups.extend(partitioned_tests)
                else:
                    # If a worker partition is empty don't add it to the output
                    if local_worker_list:
                        worker_groups.append(local_worker_list)
            else:
                raise TypeError('The input yaml is the incorrect format')
        else:
            raise TypeError('The input yaml is the incorrect format')
    return worker_groups