This file is indexed.

/usr/share/pyshared/insanity/gstreamertest.py is in python-insanity 0.0+git20110920.4750a8e8-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# GStreamer QA system
#
#       gstreamertest.py
#
# Copyright (c) 2007, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

import sys
import os

from insanity.log import error, warning, debug, info, exception
from insanity.dbustest import PythonDBusTest

import dbus
import gobject
gobject.threads_init()

import gst

if gst.pygst_version < (0, 10, 9):
    # pygst before 0.10.9 has atexit(gst_deinit), causing segfaults.  Let's
    # replace sys.exit with something that overrides atexit processing:
    def exi(status=0):
        os._exit(status)
    sys.exit = exi

class GStreamerTestBase(PythonDBusTest):
    """
    Tests that specifically run a GStreamer pipeline
    """
    __test_name__ = """gstreamer-test-base"""
    __test_description__ = """Base class for GStreamer tests"""
    __test_checklist__ = {
        "valid-pipeline" : "The test pipeline was properly created",
        "pipeline-change-state" : "The initial state_change happened succesfully",
        "reached-initial-state" : "The pipeline reached the initial GstElementState",
        "no-errors-seen" : "No errors were emitted from the pipeline"
        }

    __test_extra_infos__ = {
        "errors" : "List of errors emitted by the pipeline",
        "tags" : "List of tags emitted by the pipeline",
        "elements-used" : "List of elements used as (name,factoryname,parentname)"
        }
    # Initial pipeline state, subclasses can override this
    __pipeline_initial_state__ = gst.STATE_PLAYING

    def __init__(self, env=None, *args, **kwargs):
        # We don't want the tests to update the registry because:
        # * it will make the tests start up faster
        # * the tests accros testrun should be using the same registry/plugins
        #
        # This feature is only available since 0.10.19.1 (24th April 2008) in
        # GStreamer core
        if env == None:
            env = {}
        env["GST_REGISTRY_UPDATE"] = "no"
        self.pipeline = None
        self.bus = None
        self._errors = []
        self._tags = {}
        self._elements = []
        self._waitcb = None
        self._reachedInitialState = False
        PythonDBusTest.__init__(self, env=env, *args, **kwargs)

    def setUp(self):
        # default gst debug output to NOTHING
        self._environ["GST_DEBUG"] = "0"
        return PythonDBusTest.setUp(self)

    def remoteSetUp(self):
        debug("%s", self.uuid)
        gst.log("%s" % self.uuid)
        # local variables

        # create the pipeline
        try:
            self.pipeline = self.createPipeline()
        except:
            exception("Error while creating pipeline")
            self.pipeline = None
        finally:
            self.validateStep("valid-pipeline", not self.pipeline == None)
            if self.pipeline == None:
                self.remoteStop()
                return

        factory = self.pipeline.get_factory()
        if factory is None:
            facname = "(no factory)"
        else:
            facname = factory.get_name()
        self._elements = ["%s %s" % (self.pipeline.get_name(), facname)]

        self._watchContainer(self.pipeline)

        # connect to bus
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self._busMessageHandlerCb)
        PythonDBusTest.remoteSetUp(self)

    def remoteTearDown(self):
        if not PythonDBusTest.remoteTearDown(self):
            return False
        gst.log("Tearing Down")
        # unref pipeline and so forth
        if self._waitcb:
            gobject.source_remove(self._waitcb)
            self._waitcb = None
        if self.pipeline:
            self.pipeline.set_state(gst.STATE_NULL)
        self.validateStep("no-errors-seen", self._errors == [])
        if not self._errors == []:
            for i in range(len(self._errors)):
                code, domain, message, dbg = self._errors[i]
                self.extraInfo("errors.%d.domain" % i, domain)
                self.extraInfo("errors.%d.message" % i, message)
                self.extraInfo("errors.%d.debug" % i, dbg)

        if not self._tags == {}:
            debug("Got tags %r", self._tags)
            for key, val in self._tags.iteritems():
                if isinstance(val, int):
                    # make sure that only values < 2**31 (MAX_INT32) are ints
                    # TODO : this is gonna screw up MASSIVELY with values > 2**63
                    if val >= 2**31:
                        self._tags[key] = long(val)
                self.extraInfo("tags.%s" % key, self._tags[key])
        if not self._elements == []:
            self.extraInfo("elements-used", self._elements)
        return True

    def remoteTest(self):
        # kickstart pipeline to initial state
        PythonDBusTest.remoteTest(self)
        debug("Setting pipeline to initial state %r", self.__pipeline_initial_state__)
        gst.log("Setting pipeline to initial state %r" % self.__pipeline_initial_state__)
        res = self.pipeline.set_state(self.__pipeline_initial_state__)
        debug("set_state returned %r", res)
        gst.log("set_state() returned %r" % res)
        self.validateStep("pipeline-change-state", not res == gst.STATE_CHANGE_FAILURE)
        if res == gst.STATE_CHANGE_FAILURE:
            warning("Setting pipeline to initial state failed, stopping test")
            gst.warning("State change failed, stopping")
            self.stop()

    def _busMessageHandlerCb(self, bus, message):
        debug("%s from %r message:%r", self.uuid, message.src, message)
        gst.log("%s from %r message:%r" % (self.uuid, message.src, message))
        # let's pass it on to subclass to see if they want us to ignore that message
        if self.handleBusMessage(message) == False:
            debug("ignoring message")
            return
        # handle common types
        if message.type == gst.MESSAGE_ERROR:
            gerror, dbg = message.parse_error()
            self._errors.append((gerror.code, gerror.domain, gerror.message, dbg))
            debug("Got an error on the bus, stopping")
            self.stop()
        elif message.type == gst.MESSAGE_TAG:
            self._gotTags(message.parse_tag())
        elif message.src == self.pipeline:
            if message.type == gst.MESSAGE_EOS:
                # it's not 100% sure we want to stop here, because of the
                # race between the final state-change message and the eos message
                # arriving on the bus.
                debug("Saw EOS, stopping")
                # we give an extra 3s for the stat change to complete all the same
                if self._reachedInitialState:
                    self.stop()
                else:
                    gst.debug("Saw EOS but not yet in initial state, allowing an extra 3s to see it")
                    self._waitcb = gobject.timeout_add(3000, self._waitingForStateChange)
            elif message.type == gst.MESSAGE_STATE_CHANGED:
                prev, cur, pending = message.parse_state_changed()
                if cur == self.__pipeline_initial_state__ and pending == gst.STATE_VOID_PENDING:
                    gst.log("Reached initial state")
                    self.validateStep("reached-initial-state")
                    self._reachedInitialState = True
                    if self.pipelineReachedInitialState():
                        debug("Stopping test because we reached initial state")
                        gst.log("Stopping test because we reached initial state")
                        self.stop()

    def _waitingForStateChange(self):
        gst.debug("timeout waiting for initial state to be reached")
        self.stop()
        return False

    def _gotTags(self, tags):
        for key in tags.keys():
            value = tags[key]
            if isinstance(value, gobject.GBoxed):
                value = repr(value)
            elif isinstance(value, gst.MiniObject):
                value = repr(value)
            self._tags[key] = value

    def _watchContainer(self, container):
        # add all elements currently preset
        for elt in container:
            factory = elt.get_factory ()
            if not factory is None:
                factory_name = factory.get_name ()
            else:
                factory_name = "(no factory)"
            self._elements.append("%s %s" % (elt.get_name(),
                                             factory_name))
            if isinstance(elt, gst.Bin):
                self._watchContainer(elt)
        container.connect("element-added", self._elementAddedCb)
        # connect to signal

    def _elementAddedCb(self, container, element):
        debug("New element %r in container %r", element, container)
        factory = element.get_factory()
        factory_name = ""
        if not factory is None:
            factory_name = factory.get_name()
        # add himself
        self._elements.append("%s %s" % (element.get_name(),
                                         factory_name))
        # if bin, add current and connect signal
        if isinstance(element, gst.Bin):
            self._watchContainer(element)

    def stop(self):
        gst.log("Stopping...")
        PythonDBusTest.stop(self)

    ## Methods that can be overridden by subclasses

    def pipelineReachedInitialState(self):
        """
        Override this method to implement some behaviour once your pipeline
        has reached the initial state.

        Return True if you want the test to stop (default behaviour).
        Return False if you want the test to carry on (most likely because you
        wish to do other actions/testing).
        """
        return True

    def handleBusMessage(self, message):
        """
        Override this method if you want to be able to handle messages from the
        bus.

        Return False if you don't want the base class to handle it (because you
        have been handling the Error messages or EOS messages and you don't
        want the base class to do the default handling.
        Else return True.
        """
        return True

    def getPipelineString(self):
        """
        Return the pipeline string for the given test.
        This method should be implemented in tests that don't create the
        pipeline manually, but instead can just return a parse-launch syntax
        string representation of the pipeline.
        """
        raise NotImplementedError

    def createPipeline(self):
        """
        Construct and return the pipeline for the given test

        Return a gst.Pipeline if creation was successful.
        Return None if an error occured.
        """
        # default implementation : ask for parse-launch syntax
        pipestring = self.getPipelineString()
        debug("%s Got pipeline string %s", self.uuid, pipestring)
        try:
            pip = gst.parse_launch(pipestring)
        except:
            exception("error while creating pipeline")
            pip = None
        return pip