/usr/lib/telepathy-gabble-tests/twisted/tubes/test-get-available-tubes.py is in telepathy-gabble-tests 0.18.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | """Test GetAvailableStreamTubeTypes and GetAvailableTubeTypes"""
import os
import sys
import dbus
from servicetest import call_async, EventPattern, tp_name_prefix,\
assertContains, assertEquals, assertLength
from gabbletest import (
exec_test, make_result_iq, acknowledge_iq, make_muc_presence)
import constants as cs
sample_parameters = dbus.Dictionary({
's': 'hello',
'ay': dbus.ByteArray('hello'),
'u': dbus.UInt32(123),
'i': dbus.Int32(-123),
}, signature='sv')
def test(q, bus, conn, stream):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, iq_event.stanza)
# muc stream tube
call_async(q, conn.Requests, 'CreateChannel', {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: 'chat@conf.localhost',
cs.STREAM_TUBE_SERVICE: 'test'})
q.expect('stream-presence', to='chat@conf.localhost/test')
# Send presence for other member of room.
stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob'))
# Send presence for own membership of room.
stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'test'))
members, event = q.expect_many(
EventPattern('dbus-signal', signal='MembersChanged',
args=[u'', [2, 3], [], [], [], 0, 0]),
EventPattern('dbus-return', method='CreateChannel'))
path = event.value[0]
props = event.value[1]
tube = bus.get_object(conn.bus_name, path)
stream_tubes_types = tube.Get(cs.CHANNEL_TYPE_STREAM_TUBE, 'SupportedSocketTypes',
dbus_interface=cs.PROPERTIES_IFACE)
if os.name == 'posix':
assert cs.SOCKET_ACCESS_CONTROL_LOCALHOST in \
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX], \
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX]
# so far we only guarantee to support credentials-passing on Linux
if sys.platform == 'linux2':
assert cs.SOCKET_ACCESS_CONTROL_CREDENTIALS in \
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX], \
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_UNIX]
assertEquals([cs.SOCKET_ACCESS_CONTROL_LOCALHOST, cs.SOCKET_ACCESS_CONTROL_PORT],
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_IPV4])
assertEquals([cs.SOCKET_ACCESS_CONTROL_LOCALHOST, cs.SOCKET_ACCESS_CONTROL_PORT],
stream_tubes_types[cs.SOCKET_ADDRESS_TYPE_IPV6])
if __name__ == '__main__':
exec_test(test)
|