This file is indexed.

/usr/share/pyshared/insanity/testrun.py is in python-insanity 0.0+git20110920.4750a8e8-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# GStreamer QA system
#
#       testrun.py
#
# Copyright (c) 2007, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

"""
Test runs

A TestRun is the execution of one or more tests/scenarios with various
arguments.
It will also collect the state of the environment.

The smallest TestRun is a single test without any arguments nor any
monitors.

Tests have access to the TestRun within which they are being executed
so they can look for results of other tests.

Access to the TestRun from test instances will be possible via DBus IPC.
"""

import gobject
import time
import dbus.gobject_service
import tempfile
import os
from insanity.log import error, warning, debug, info
from insanity.test import Test
from insanity.arguments import Arguments
import insanity.environment as environment
import insanity.dbustools as dbustools

##
## TODO/FIXME
##
## Add possibility to add/modify/remove env variables
##   This will be needed to run test with different environments
##   WITHOUT having to restart the daemon.

class TestRun(gobject.GObject):
    """
    A TestRun is the execution of one or more tests/scenarios with various
    arguments.
    It will also collect the state of the environment.

    The smallest TestRun is a single test without any arguments nor any
    monitors.

    Tests have access to the TestRun within which they are being executed
    so they can look for results of other tests.

    Access to the TestRun from test instances will be possible via DBus IPC.
    """
    __gsignals__ = {
        "start" : (gobject.SIGNAL_RUN_LAST,
                   gobject.TYPE_NONE,
                   ()),
        "done" : (gobject.SIGNAL_RUN_LAST,
                   gobject.TYPE_NONE,
                   ()),
        "aborted" : (gobject.SIGNAL_RUN_LAST,
                   gobject.TYPE_NONE,
                   ( )),
        # a test started/ended
        # Warning, it is not automatically a SingleTest
        "single-test-done" : (gobject.SIGNAL_RUN_LAST,
                              gobject.TYPE_NONE,
                              (gobject.TYPE_PYOBJECT, )),
        "single-test-start" : (gobject.SIGNAL_RUN_LAST,
                              gobject.TYPE_NONE,
                              (gobject.TYPE_PYOBJECT, )),

        # new-remote-test (uuid)
        #  emitted when a new test has appeared on the private bus
        "new-remote-test" : (gobject.SIGNAL_RUN_LAST,
                             gobject.TYPE_NONE,
                             (gobject.TYPE_STRING, )),
        "removed-remote-test" : (gobject.SIGNAL_RUN_LAST,
                                 gobject.TYPE_NONE,
                                 (gobject.TYPE_STRING, ))
        }

    def __init__(self, maxnbtests=1, workingdir=None, env=None, clientid=None):
        """
        maxnbtests : Maximum number of tests to run simultaneously in each batch.
        workingdir : Working directory (default : getcwd() + /workingdir/)
        env : extra environment variables
        """
        gobject.GObject.__init__(self)
        # dbus
        self._bus = None
        self._bus_address = None
        self._dbusobject = None
        self._dbusiface = None
        self._setupPrivateBus()

        self._tests = [] # list of (test, arguments, monitors)
        self._storage = None
        self._currenttest = None
        self._currentmonitors = None
        self._currentarguments = None
        self._runninginstances = []
        self._maxnbtests = maxnbtests
        self._starttime = None
        self._stoptime = None
        self._clientid = clientid
        # disambiguation
        # _environment are the environment information
        # _environ are the environment variables (env)
        self._environment = {}
        self._env = os.environ.copy()
        if env:
            self._env.update(env)
        self._running = False
        self.setWorkingDirectory(workingdir or os.path.join(os.getcwd(), "workingdir"))

    ## PUBLIC API

    def run(self):
        """
        Start executing the tests.
        """
        if self._running:
            error("TestRun is already running")
            return
        self._collectEnvironment()

    def abort(self):
        """
        Abort the tests execution.
        """
        # TODO : fill
        for test in self._runninginstances:
            test.stop()
        self.emit("aborted")

    def setStorage(self, storage):
        """
        Use the given storage for this TestRun
        """
        self._storage = storage

    def addTest(self, test, arguments, monitors=None):
        """
        Adds test with the given arguments (or generator) and monitors
        to the list of tests to be run

        monitors are a list of tuples containing:
        * the monitor class
        * (optional) the arguments to use on that monitor
        """
        if not isinstance(test, type) and not issubclass(test, Test):
            raise TypeError("Given test is not a Test object !")
        # arguments NEED to be an Arguments object or a dictionnary
        if isinstance(arguments, dict):
            # convert dictionnary to Arguments
            info("Creating Arguments for %r" % arguments)
            arguments = Arguments(**arguments)
        elif not isinstance(arguments, Arguments):
            raise TypeError("Test arguments need to be of type Arguments or dict")
        self._tests.append((test, arguments, monitors))

    def getEnvironment(self):
        """
        Returns the environment information of this testrun as a
        dictionnary.
        """
        return self._environment

    ## PRIVATE API

    def _setupPrivateBus(self):
        self._bus = dbustools.get_private_session_bus()
        self._bus_address = dbustools.get_private_bus_address()
        self._dbusobject = self._bus.get_object("org.freedesktop.DBus",
                                                "/org/freedesktop/DBus")
        self._dbusiface = dbus.Interface(self._dbusobject,
                                         "org.freedesktop.DBus")
        self._dbusiface.connect_to_signal("NameOwnerChanged",
                                          self._dbusNameOwnerChangedSignal)

    def _dbusNameOwnerChangedSignal(self, name, oldowner, newowner):
        # we only care about connections named net.gstreamer.Insanity.RemotePythonRunner.xxx
        info("name:%s , oldowner:%s, newowner:%s" % (name, oldowner, newowner))
        if not name.startswith("net.gstreamer.Insanity.RemotePythonRunner.RemotePythonRunner"):
            return
        # extract uuid
        uuid = name.rsplit('.RemotePythonRunner', 1)[-1]
        if newowner == "":
            self.emit("removed-remote-test", uuid)
        elif oldowner == "":
            self.emit("new-remote-test", uuid)

    def _collectEnvironment(self):
        """
        Collect the environment settings, parameters, variables,...
        """
        # we specify our own registry
        path = self.get_temp_file(nameid="registry", category="testrun")[1]
        self._env["GST_REGISTRY"] = path
        environment.collectEnvironment(self._env, self._gotEnvironment)

    def _gotEnvironment(self, resdict):
        info("Got environment %r", resdict)
        self._environment = resdict
        self.emit("start")
        self._starttime = int(time.time())
        self._storage.startNewTestRun(self, self._clientid)
        self._runNextBatch()

    def _singleTestStart(self, test):
        info("test %r started", test)
        self.emit("single-test-start", test)
        self._storage.newTestStarted(self, test)

    def _singleTestDone(self, test):
        info("Done with test %r , success rate %02f%%",
             test, test.getSuccessPercentage())
        self.emit("single-test-done", test)
        # FIXME : Improvement : disconnect all signals from that test
        if test in self._runninginstances:
            self._runninginstances.remove(test)
        self._storage.newTestFinished(self, test)
        self._runNext()

    def _singleTestCheck(self, test, check, validate):
        pass

    def _runNext(self):
        """ Run the next test+arg+monitor combination """
        if len(self._runninginstances) >= self._maxnbtests:
            warning("We were already running the max number of tests")
            return False
        info("Getting next test arguments for this batch")
        try:
            kwargs = self._currentarguments.next()
        except StopIteration:
            if len(self._runninginstances):
                info("No more arguments, but still a test running")
                return False
            info("No more arguments, we're finished with this batch")
            self._runNextBatch()
            return False

        # grab the next arguments
        testclass = self._currenttest
        monitors = self._currentmonitors

        # create test with arguments
        debug("Creating test %r with arguments %r" % (testclass, kwargs))
        test = testclass(testrun=self, bus=self._bus,
                         bus_address=self._bus_address,
                         **kwargs)
        if monitors:
            for monitor in monitors:
                test.addMonitor(*monitor)

        test.connect("start", self._singleTestStart)
        test.connect("done", self._singleTestDone)
        test.connect("check", self._singleTestCheck)

        # start test
        allok = test.run()
        if allok:
            # add instance to running tests
            self._runninginstances.append(test)

        warning("Just added a test %d/%d", len(self._runninginstances), self._maxnbtests)
        # if we can still create a new test, call ourself again
        if len(self._runninginstances) < self._maxnbtests:
            warning("still more test to run (current:%d/max:%d)",
                    len(self._runninginstances), self._maxnbtests)
            gobject.idle_add(self._runNext)
        return False

    def _runNextBatch(self):
        """ Runs the next test batch """
        if len(self._tests) == 0:
            # if nothing left, stop
            info("No more tests batch to run, we're done")
            self._stoptime = int(time.time())
            self._storage.endTestRun(self)
            self._running = False
            self.emit("done")
            return False

        info("Getting next test batch")
        # pop out the next batch
        test, args, monitors = self._tests.pop(0)
        self._currenttest = test
        self._currentmonitors = monitors
        self._currentarguments = args

        info("Current test : %r" % test)
        info("Current monitors : %r" % monitors)
        info("Current arguments : %r" % args)

        # and run the first one of that batch
        self._runNext()
        return False

    def getCurrentBatchPosition(self):
        """
        Returns the position (index) in the current batch.
        """
        if self._currentarguments:
            return self._currentarguments.current()
        return 0

    def getCurrentBatchLength(self):
        """
        Returns the size of the current batch.
        """
        if self._currentarguments:
            return len(self._currentarguments)
        return 0

    def getWorkingDirectory(self):
        """
        Returns the currently configured working directory for this
        TestRun.
        """
        return self._workingdir

    def setWorkingDirectory(self, workdir):
        """
        Change the working directory. This can only be called when the
        TestRun isn't running.
        Creates the directories if they are not present.

        Returns True if the working directory was properly changed.
        Returns False if self is running.
        """
        if self._running:
            return False
        debug("Changing workdir to %s", workdir)
        self._workingdir = workdir
        self._outputdir = os.path.join(self._workingdir, "outputfiles")
        # ensure directories exist
        if not os.path.exists(self._outputdir):
            os.makedirs(self._outputdir)
        return True

    def get_temp_file(self, nameid='', suffix='', category="insanity-output"):
        """
        Creates a new temporary file in a secure fashion, guaranteeing
        it will be unique and only accessible from this user.

        If specified, the nameid will be inserted in the unique name.
        If specified, the suffix will be used

        The returned file will NEVER be removed or closed, the caller
        should take care of that.

        Return (filepath, fileobject) for the newly created file
        """
        # we create temporary files in a specified directory
        prefix = "%s-%s" % (category, nameid)
        return tempfile.mkstemp(prefix=prefix,
                                suffix=suffix,
                                dir=self._outputdir)


gobject.type_register(TestRun)

class ListTestRun(TestRun):
    """
    Convenience class to specify a list of tests that will be run
    with the same arguments/generator and monitor(s).

    Parameters:
    * fatal-failure : boolean (default:False)
    If set to True, a test will only be run if the previous test for
    the same argument has completed successfully.
    """

    def __init__(self, tests, arguments, monitors=None, *args, **kwargs):
        TestRun.__init__(self, *args, **kwargs)
        for test in tests:
            self.addTest(test, arguments, monitors)

def single_test_run(test, arguments=[], monitors=None):
    """
    Convenience function to create a TestRun for a single test
    """
    tr = TestRun()
    tr.addTest(test, arguments, monitors)
    return tr