/usr/lib/python2.7/dist-packages/stestr/scheduler.py is in python-stestr 1.1.0-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import itertools
import multiprocessing
import operator
import random
import yaml
from stestr import selection
def partition_tests(test_ids, concurrency, repository, group_callback,
randomize=False):
"""Partition test_ids by concurrency.
Test durations from the repository are used to get partitions which
have roughly the same expected runtime. New tests - those with no
recorded duration - are allocated in round-robin fashion to the
partitions created using test durations.
:param list test_ids: The list of test_ids to be partitioned
:param int concurrency: The concurrency that will be used for running
the tests. This is the number of partitions that test_ids will be
split into.
:param repository: A repository object that
:param group_callback: A callback function that is used as a scheduler
hint to group test_ids together and treat them as a single unit for
scheduling. This function expects a single test_id parameter and it
will return a group identifier. Tests_ids that have the same group
identifier will be kept on the same worker.
:param bool randomize: If true each partition's test order will be
randomized
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
_group_callback = group_callback
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = {}
if repository:
time_data = repository.get_test_times(test_ids)
timed_tests = time_data['known']
unknown_tests = time_data['unknown']
else:
timed_tests = {}
unknown_tests = set(test_ids)
# Group tests: generate group_id -> test_ids.
group_ids = collections.defaultdict(list)
if _group_callback is None:
group_callback = lambda _: None
else:
group_callback = _group_callback
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum(
[timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(
group_tests)])
if not untimed_ids:
timed[group_id] = group_time
elif group_time:
partial[group_id] = group_time
else:
unknown.append(group_id)
# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(
groups.items(), key=operator.itemgetter(1), reverse=True)
for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item: (item[0], len(item[1])))
consume_queue(timed)
consume_queue(partial)
# Assign groups with entirely unknown times in round robin fashion to
# the partitions.
for partition, group_id in zip(itertools.cycle(partitions), unknown):
partition.extend(group_ids[group_id])
if randomize:
out_parts = []
for partition in partitions:
temp_part = list(partition)
random.shuffle(temp_part)
out_parts.append(list(temp_part))
return out_parts
else:
return partitions
def local_concurrency():
"""Get the number of available CPUs on the system.
:return: An int for the number of cpus. Or None if it couldn't be found
"""
try:
return multiprocessing.cpu_count()
except NotImplementedError:
# No concurrency logic known.
return None
def generate_worker_partitions(ids, worker_path, repository=None,
group_callback=None, randomize=False):
"""Parse a worker yaml file and generate test groups
:param list ids: A list of test ids too be partitioned
:param path worker_path: The path to a worker file
:param repository: A repository object that will be used for looking up
timing data. This is optional, and also will only be used for
scheduling if there is a count field on a worker.
:param group_callback: A callback function that is used as a scheduler
hint to group test_ids together and treat them as a single unit for
scheduling. This function expects a single test_id parameter and it
will return a group identifier. Tests_ids that have the same group
identifier will be kept on the same worker. This is optional and
also will only be used for scheduling if there is a count field on a
worker.
:param bool randomize: If true each partition's test order will be
randomized. This is optional and also will only be used for scheduling
if there is a count field on a worker.
:returns: A list where each element is a distinct subset of test_ids.
"""
with open(worker_path, 'r') as worker_file:
workers_desc = yaml.load(worker_file.read())
worker_groups = []
for worker in workers_desc:
if isinstance(worker, dict) and 'worker' in worker.keys():
if isinstance(worker['worker'], list):
local_worker_list = selection.filter_tests(
worker['worker'], ids)
if 'concurrency' in worker.keys() and worker[
'concurrency'] > 1:
partitioned_tests = partition_tests(
local_worker_list, worker['concurrency'], repository,
group_callback, randomize)
worker_groups.extend(partitioned_tests)
else:
# If a worker partition is empty don't add it to the output
if local_worker_list:
worker_groups.append(local_worker_list)
else:
raise TypeError('The input yaml is the incorrect format')
else:
raise TypeError('The input yaml is the incorrect format')
return worker_groups
|