This file is indexed.

/usr/lib/python3/dist-packages/molotov/runner.py is in python3-molotov 1.4-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from contextlib import suppress
import signal
import multiprocessing
import asyncio
import os

from molotov.api import get_fixture
from molotov.listeners import EventSender
from molotov.stats import get_statsd_client
from molotov.sharedcounter import SharedCounters
from molotov.util import cancellable_sleep, stop, is_stopped, set_timer
from molotov.worker import Worker


class Runner(object):
    """Manages processes & workers and grabs results.
    """
    def __init__(self, args, loop=None):
        self.args = args
        self.console = self.args.shared_console
        if loop is None:
            loop = asyncio.get_event_loop()
        self.loop = loop
        if self.args.statsd:
            self.statsd = get_statsd_client(args.statsd_address,
                                            loop=self.loop)
        else:
            self.statsd = None
        self._tasks = []
        self._procs = []
        self._results = SharedCounters('WORKER', 'REACHED', 'RATIO', 'OK',
                                       'FAILED', 'MINUTE_OK', 'MINUTE_FAILED')
        self.eventer = EventSender(self.console)

    def gather(self, *futures):
        return asyncio.gather(*futures, loop=self.loop, return_exceptions=True)

    def ensure_future(self, coro):
        return asyncio.ensure_future(coro, loop=self.loop)

    def __call__(self):
        global_setup = get_fixture('global_setup')
        if global_setup is not None:
            try:
                global_setup(self.args)
            except Exception as e:
                self.console.print("The global_setup() fixture failed")
                self.console.print_error(e)
                raise

        try:
            return self._launch_processes()
        finally:
            global_teardown = get_fixture('global_teardown')
            if global_teardown is not None:
                try:
                    global_teardown()
                except Exception as e:
                    # we can't stop the teardown process
                    self.console.print_error(e)

    def _launch_processes(self):
        args = self.args
        signal.signal(signal.SIGINT, self._shutdown)
        signal.signal(signal.SIGTERM, self._shutdown)
        args.original_pid = os.getpid()

        if args.processes > 1:
            if not args.quiet:
                self.console.print('Forking %d processes' % args.processes)
            jobs = []
            for i in range(args.processes):
                p = multiprocessing.Process(target=self._process)
                jobs.append(p)
                p.start()

            for job in jobs:
                self._procs.append(job)

            async def run(quiet, console):
                while len(self._procs) > 0:
                    if not quiet:
                        console.print(self.display_results(), end='\r')
                    for job in jobs:
                        if job.exitcode is not None and job in self._procs:
                            self._procs.remove(job)
                    await cancellable_sleep(args.console_update)
                await self.console.stop()
                await self.eventer.stop()

            tasks = [self.ensure_future(self.console.display()),
                     self.ensure_future(self._send_workers_event(1)),
                     self.ensure_future(run(args.quiet, self.console))]
            self.loop.run_until_complete(self.gather(*tasks))
        else:
            self._process()

        return self._results

    def _shutdown(self, signal, frame):
        stop()
        self._kill_tasks()
        # send sigterms
        for proc in self._procs:
            proc.terminate()

    def _runner(self):
        args = self.args

        def _prepare():
            tasks = []
            delay = 0
            if args.ramp_up > 0.:
                step = args.ramp_up / args.workers
            else:
                step = 0.
            for i in range(self.args.workers):
                worker = Worker(i, self._results, self.console, self.args,
                                self.statsd, delay, self.loop)
                f = self.ensure_future(worker.run())
                tasks.append(f)
                delay += step
            return tasks

        if self.args.quiet:
            return _prepare()
        else:
            msg = 'Preparing {} worker{}'
            msg = msg.format(args.workers, 's' if args.workers > 1 else '')
            return self.console.print_block(msg, _prepare)

    def _process(self):
        set_timer()
        if self.args.processes > 1:
            signal.signal(signal.SIGINT, self._shutdown)
            signal.signal(signal.SIGTERM, self._shutdown)
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)

        if self.args.debug:
            self.console.print('**** RUNNING IN DEBUG MODE == SLOW ****')
            self.loop.set_debug(True)

        if self.args.original_pid == os.getpid():
            self._tasks.append(self.ensure_future(self._send_workers_event(1)))
            if not self.args.quiet:
                fut = self._display_results(self.args.console_update)
                update = self.ensure_future(fut)
                display = self.ensure_future(self.console.display())
                display = self.gather(update, display)
                self._tasks.append(display)

        workers = self.gather(*self._runner())
        workers.add_done_callback(lambda fut: stop())
        self._tasks.append(workers)

        try:
            self.loop.run_until_complete(self.gather(*self._tasks))
        finally:
            self._kill_tasks()
            if self.statsd is not None:
                self.statsd.close()
            self.loop.close()

    def _kill_tasks(self):
        cancellable_sleep.cancel_all()
        for task in reversed(self._tasks):
            with suppress(asyncio.CancelledError):
                task.cancel()
        for task in self._tasks:
            del task
        self._tasks[:] = []

    def display_results(self):
        ok, fail = self._results['OK'].value, self._results['FAILED'].value
        workers = self._results['WORKER'].value
        pat = 'SUCCESSES: %s | FAILURES: %s | WORKERS: %s'
        return pat % (ok, fail, workers)

    async def _display_results(self, update_interval):
        while not is_stopped():
            self.console.print(self.display_results(), end='\r')
            await cancellable_sleep(update_interval)
        await self.console.stop()

    async def _send_workers_event(self, update_interval):
        while not self.eventer.stopped() and not is_stopped():
            workers = self._results['WORKER'].value
            await self.eventer.send_event('current_workers', workers=workers)
            await cancellable_sleep(update_interval)