This file is indexed.

/usr/share/pyshared/insanity/dbustools.py is in python-insanity 0.0+git20110920.4750a8e8-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# GStreamer QA system
#
#       dbus.py
#
# Copyright (c) 2007, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

"""
D-Bus convenience methods and classes
"""

#
# Service accesible from test instances
#
# domain : net.gstreamer.Insanity
#
# objects
#   /TestRun/<TestRunID>
#   /TestRun/<TestRunID>/<TestName>/<TestID>
#

from dbus.bus import BusConnection
from dbus.mainloop.glib import DBusGMainLoop
import tempfile
import subprocess
import os
import signal
from insanity.log import debug, info

private_bus = None
private_bus_address = None
private_bus_pid = None

def spawn_session_dbus():
    """
    Spawns a session dbus daemon.

    Returns a tuple of (DBUS_SESSION_BUS_ADDRESS, DBUS_SESSION_BUS_PID) if the
    daemon could be started properly
    """
    debug("Spawning private DBus daemon")
    logfilefd, logfilename = tempfile.mkstemp()

    # spawn dbus-launch
    subprocess.call(["dbus-launch"],
                    stdout = logfilefd,
                    stderr = subprocess.STDOUT)

    # parse the returned values
    afile = file(logfilename)

    res = [x.strip().split('=', 1)[-1] for x in afile.readlines()]
    afile.close()
    os.remove(logfilename)
    info("%r" % res)
    # return the tuple result
    return tuple(res)

def kill_private_dbus():
    """
    Kill the private dbus daemon used by the client
    """
    global private_bus_pid, private_bus, private_bus_address
    if private_bus_pid:
        info("Killing private dbus daemon [pid:%d]" % int(private_bus_pid))
        os.kill(int(private_bus_pid), signal.SIGKILL)
        private_bus = None
        private_bus_address = None
        private_bus_pid = None

def get_private_session_bus():
    """
    Get the private dbus BusConnection to use in the client.
    Tests should NOT use this method
    """
    global private_bus, private_bus_pid, private_bus_address
    if private_bus == None:
        if private_bus_pid:
            # cleanup
            kill_private_dbus()
        private_bus_address, private_bus_pid = spawn_session_dbus()[:2]
        debug("Creating BusConnection for address %s" % private_bus_address)
        gml = DBusGMainLoop()
        private_bus = BusConnection(private_bus_address, mainloop=gml)
    return private_bus

def get_private_bus_address():
    """
    Get the address of the private dbus daemon used in the client.
    This is the address that test instances can connect to in order
    to communicate with the Test Client.
    """
    global private_bus, private_bus_pid, private_bus_address
    if private_bus == None:
        if private_bus_pid:
            # cleanup
            kill_private_dbus()
        private_bus_address, private_bus_pid = spawn_session_dbus()[:2]
        print "Creating BusConnection for address", private_bus_address
        gml = DBusGMainLoop()
        private_bus = BusConnection(private_bus_address, mainloop=gml)
    return private_bus_address

def unwrap(x):
    """Hack to unwrap D-Bus values, so that they're easier to read when
    printed."""

    if isinstance(x, list):
        return map(unwrap, x)

    if isinstance(x, tuple):
        return tuple(map(unwrap, x))

    if isinstance(x, dict):
        return dict([(unwrap(k), unwrap(v)) for k, v in x.iteritems()])

    for t in [unicode, str, long, int, float, bool]:
        if isinstance(x, t):
            return t(x)

    return x