This file is indexed.

/usr/lib/python3/dist-packages/wormhole/test/common.py is in magic-wormhole 0.10.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
# no unicode_literals untill twisted update
from twisted.application import service
from twisted.internet import defer, task, reactor
from twisted.python import log
from click.testing import CliRunner
import mock
from ..cli import cli
from ..transit import allocate_tcp_port
from ..server.server import RelayServer

class ServerBase:
    def setUp(self):
        self._setup_relay(None)

    def _setup_relay(self, error, advertise_version=None):
        self.sp = service.MultiService()
        self.sp.startService()
        self.relayport = allocate_tcp_port()
        self.transitport = allocate_tcp_port()
        # need to talk to twisted team about only using unicode in
        # endpoints.serverFromString
        s = RelayServer("tcp:%d:interface=127.0.0.1" % self.relayport,
                        "tcp:%s:interface=127.0.0.1" % self.transitport,
                        advertise_version=advertise_version,
                        signal_error=error)
        s.setServiceParent(self.sp)
        self._relay_server = s
        self._rendezvous = s._rendezvous
        self._transit_server = s._transit
        self.relayurl = u"ws://127.0.0.1:%d/v1" % self.relayport
        self.rdv_ws_port = self.relayport
        # ws://127.0.0.1:%d/wormhole-relay/ws
        self.transit = u"tcp:127.0.0.1:%d" % self.transitport

    def tearDown(self):
        # Unit tests that spawn a (blocking) client in a thread might still
        # have threads running at this point, if one is stuck waiting for a
        # message from a companion which has exited with an error. Our
        # relay's .stopService() drops all connections, which ought to
        # encourage those threads to terminate soon. If they don't, print a
        # warning to ease debugging.

        # XXX FIXME there's something in _noclobber test that's not
        # waiting for a close, I think -- was pretty relieably getting
        # unclean-reactor, but adding a slight pause here stops it...
        from twisted.internet import reactor

        tp = reactor.getThreadPool()
        if not tp.working:
            d = defer.succeed(None)
            d.addCallback(lambda _: self.sp.stopService())
            d.addCallback(lambda _: task.deferLater(reactor, 0.1, lambda: None))
            return d
            return self.sp.stopService()
        # disconnect all callers
        d = defer.maybeDeferred(self.sp.stopService)
        wait_d = defer.Deferred()
        # wait a second, then check to see if it worked
        reactor.callLater(1.0, wait_d.callback, None)
        def _later(res):
            if len(tp.working):
                log.msg("wormhole.test.common.ServerBase.tearDown:"
                        " I was unable to convince all threads to exit.")
                tp.dumpStats()
                print("tearDown warning: threads are still active")
                print("This test will probably hang until one of the"
                      " clients gives up of their own accord.")
            else:
                log.msg("wormhole.test.common.ServerBase.tearDown:"
                        " I convinced all threads to exit.")
            return d
        wait_d.addCallback(_later)
        return wait_d

def config(*argv):
    r = CliRunner()
    with mock.patch("wormhole.cli.cli.go") as go:
        res = r.invoke(cli.wormhole, argv, catch_exceptions=False)
        if res.exit_code != 0:
            print(res.exit_code)
            print(res.output)
            print(res)
            assert 0
        cfg = go.call_args[0][1]
    return cfg

@defer.inlineCallbacks
def poll_until(predicate):
    # return a Deferred that won't fire until the predicate is True
    while not predicate():
        d = defer.Deferred()
        reactor.callLater(0.001, d.callback, None)
        yield d

@defer.inlineCallbacks
def pause_one_tick():
    # return a Deferred that won't fire until at least the next reactor tick
    d = defer.Deferred()
    reactor.callLater(0.001, d.callback, None)
    yield d