This file is indexed.

/usr/lib/python2.7/dist-packages/twisted/conch/test/test_conch.py is in python-twisted-conch 1:13.2.0-1ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# -*- test-case-name: twisted.conch.test.test_conch -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

import os, sys, socket
from itertools import count

from zope.interface import implements

from twisted.cred import portal
from twisted.internet import reactor, defer, protocol
from twisted.internet.error import ProcessExitedAlready
from twisted.internet.task import LoopingCall
from twisted.python import log, runtime
from twisted.trial import unittest
from twisted.conch.error import ConchError
from twisted.conch.avatar import ConchUser
from twisted.conch.ssh.session import ISession, SSHSession, wrapProtocol

try:
    from twisted.conch.scripts.conch import SSHSession as StdioInteractingSession
except ImportError, e:
    StdioInteractingSession = None
    _reason = str(e)
    del e

from twisted.conch.test.test_ssh import ConchTestRealm
from twisted.python.procutils import which

from twisted.conch.test.keydata import publicRSA_openssh, privateRSA_openssh
from twisted.conch.test.keydata import publicDSA_openssh, privateDSA_openssh

from twisted.conch.test.test_ssh import Crypto, pyasn1
try:
    from twisted.conch.test.test_ssh import ConchTestServerFactory, \
        ConchTestPublicKeyChecker
except ImportError:
    pass



class StdioInteractingSessionTests(unittest.TestCase):
    """
    Tests for L{twisted.conch.scripts.conch.SSHSession}.
    """
    if StdioInteractingSession is None:
        skip = _reason

    def test_eofReceived(self):
        """
        L{twisted.conch.scripts.conch.SSHSession.eofReceived} loses the
        write half of its stdio connection.
        """
        class FakeStdio:
            writeConnLost = False

            def loseWriteConnection(self):
                self.writeConnLost = True

        stdio = FakeStdio()
        channel = StdioInteractingSession()
        channel.stdio = stdio
        channel.eofReceived()
        self.assertTrue(stdio.writeConnLost)



class Echo(protocol.Protocol):
    def connectionMade(self):
        log.msg('ECHO CONNECTION MADE')


    def connectionLost(self, reason):
        log.msg('ECHO CONNECTION DONE')


    def dataReceived(self, data):
        self.transport.write(data)
        if '\n' in data:
            self.transport.loseConnection()



class EchoFactory(protocol.Factory):
    protocol = Echo



class ConchTestOpenSSHProcess(protocol.ProcessProtocol):
    """
    Test protocol for launching an OpenSSH client process.

    @ivar deferred: Set by whatever uses this object. Accessed using
    L{_getDeferred}, which destroys the value so the Deferred is not
    fired twice. Fires when the process is terminated.
    """

    deferred = None
    buf = ''

    def _getDeferred(self):
        d, self.deferred = self.deferred, None
        return d


    def outReceived(self, data):
        self.buf += data


    def processEnded(self, reason):
        """
        Called when the process has ended.

        @param reason: a Failure giving the reason for the process' end.
        """
        if reason.value.exitCode != 0:
            self._getDeferred().errback(
                ConchError("exit code was not 0: %s" %
                                 reason.value.exitCode))
        else:
            buf = self.buf.replace('\r\n', '\n')
            self._getDeferred().callback(buf)



class ConchTestForwardingProcess(protocol.ProcessProtocol):
    """
    Manages a third-party process which launches a server.

    Uses L{ConchTestForwardingPort} to connect to the third-party server.
    Once L{ConchTestForwardingPort} has disconnected, kill the process and fire
    a Deferred with the data received by the L{ConchTestForwardingPort}.

    @ivar deferred: Set by whatever uses this object. Accessed using
    L{_getDeferred}, which destroys the value so the Deferred is not
    fired twice. Fires when the process is terminated.
    """

    deferred = None

    def __init__(self, port, data):
        """
        @type port: C{int}
        @param port: The port on which the third-party server is listening.
        (it is assumed that the server is running on localhost).

        @type data: C{str}
        @param data: This is sent to the third-party server. Must end with '\n'
        in order to trigger a disconnect.
        """
        self.port = port
        self.buffer = None
        self.data = data


    def _getDeferred(self):
        d, self.deferred = self.deferred, None
        return d


    def connectionMade(self):
        self._connect()


    def _connect(self):
        """
        Connect to the server, which is often a third-party process.
        Tries to reconnect if it fails because we have no way of determining
        exactly when the port becomes available for listening -- we can only
        know when the process starts.
        """
        cc = protocol.ClientCreator(reactor, ConchTestForwardingPort, self,
                                    self.data)
        d = cc.connectTCP('127.0.0.1', self.port)
        d.addErrback(self._ebConnect)
        return d


    def _ebConnect(self, f):
        reactor.callLater(.1, self._connect)


    def forwardingPortDisconnected(self, buffer):
        """
        The network connection has died; save the buffer of output
        from the network and attempt to quit the process gracefully,
        and then (after the reactor has spun) send it a KILL signal.
        """
        self.buffer = buffer
        self.transport.write('\x03')
        self.transport.loseConnection()
        reactor.callLater(0, self._reallyDie)


    def _reallyDie(self):
        try:
            self.transport.signalProcess('KILL')
        except ProcessExitedAlready:
            pass


    def processEnded(self, reason):
        """
        Fire the Deferred at self.deferred with the data collected
        from the L{ConchTestForwardingPort} connection, if any.
        """
        self._getDeferred().callback(self.buffer)



class ConchTestForwardingPort(protocol.Protocol):
    """
    Connects to server launched by a third-party process (managed by
    L{ConchTestForwardingProcess}) sends data, then reports whatever it
    received back to the L{ConchTestForwardingProcess} once the connection
    is ended.
    """


    def __init__(self, protocol, data):
        """
        @type protocol: L{ConchTestForwardingProcess}
        @param protocol: The L{ProcessProtocol} which made this connection.

        @type data: str
        @param data: The data to be sent to the third-party server.
        """
        self.protocol = protocol
        self.data = data


    def connectionMade(self):
        self.buffer = ''
        self.transport.write(self.data)


    def dataReceived(self, data):
        self.buffer += data


    def connectionLost(self, reason):
        self.protocol.forwardingPortDisconnected(self.buffer)



def _makeArgs(args, mod="conch"):
    start = [sys.executable, '-c'
"""
### Twisted Preamble
import sys, os
path = os.path.abspath(sys.argv[0])
while os.path.dirname(path) != path:
    if os.path.basename(path).startswith('Twisted'):
        sys.path.insert(0, path)
        break
    path = os.path.dirname(path)

from twisted.conch.scripts.%s import run
run()""" % mod]
    return start + list(args)



class ConchServerSetupMixin:
    if not Crypto:
        skip = "can't run w/o PyCrypto"

    if not pyasn1:
        skip = "Cannot run without PyASN1"

    realmFactory = staticmethod(lambda: ConchTestRealm('testuser'))

    def _createFiles(self):
        for f in ['rsa_test','rsa_test.pub','dsa_test','dsa_test.pub',
                  'kh_test']:
            if os.path.exists(f):
                os.remove(f)
        open('rsa_test','w').write(privateRSA_openssh)
        open('rsa_test.pub','w').write(publicRSA_openssh)
        open('dsa_test.pub','w').write(publicDSA_openssh)
        open('dsa_test','w').write(privateDSA_openssh)
        os.chmod('dsa_test', 33152)
        os.chmod('rsa_test', 33152)
        open('kh_test','w').write('127.0.0.1 '+publicRSA_openssh)


    def _getFreePort(self):
        s = socket.socket()
        s.bind(('', 0))
        port = s.getsockname()[1]
        s.close()
        return port


    def _makeConchFactory(self):
        """
        Make a L{ConchTestServerFactory}, which allows us to start a
        L{ConchTestServer} -- i.e. an actually listening conch.
        """
        realm = self.realmFactory()
        p = portal.Portal(realm)
        p.registerChecker(ConchTestPublicKeyChecker())
        factory = ConchTestServerFactory()
        factory.portal = p
        return factory


    def setUp(self):
        self._createFiles()
        self.conchFactory = self._makeConchFactory()
        self.conchFactory.expectedLoseConnection = 1
        self.conchServer = reactor.listenTCP(0, self.conchFactory,
                                             interface="127.0.0.1")
        self.echoServer = reactor.listenTCP(0, EchoFactory())
        self.echoPort = self.echoServer.getHost().port


    def tearDown(self):
        try:
            self.conchFactory.proto.done = 1
        except AttributeError:
            pass
        else:
            self.conchFactory.proto.transport.loseConnection()
        return defer.gatherResults([
                defer.maybeDeferred(self.conchServer.stopListening),
                defer.maybeDeferred(self.echoServer.stopListening)])



class ForwardingMixin(ConchServerSetupMixin):
    """
    Template class for tests of the Conch server's ability to forward arbitrary
    protocols over SSH.

    These tests are integration tests, not unit tests. They launch a Conch
    server, a custom TCP server (just an L{EchoProtocol}) and then call
    L{execute}.

    L{execute} is implemented by subclasses of L{ForwardingMixin}. It should
    cause an SSH client to connect to the Conch server, asking it to forward
    data to the custom TCP server.
    """

    def test_exec(self):
        """
        Test that we can use whatever client to send the command "echo goodbye"
        to the Conch server. Make sure we receive "goodbye" back from the
        server.
        """
        d = self.execute('echo goodbye', ConchTestOpenSSHProcess())
        return d.addCallback(self.assertEqual, 'goodbye\n')


    def test_localToRemoteForwarding(self):
        """
        Test that we can use whatever client to forward a local port to a
        specified port on the server.
        """
        localPort = self._getFreePort()
        process = ConchTestForwardingProcess(localPort, 'test\n')
        d = self.execute('', process,
                         sshArgs='-N -L%i:127.0.0.1:%i'
                         % (localPort, self.echoPort))
        d.addCallback(self.assertEqual, 'test\n')
        return d


    def test_remoteToLocalForwarding(self):
        """
        Test that we can use whatever client to forward a port from the server
        to a port locally.
        """
        localPort = self._getFreePort()
        process = ConchTestForwardingProcess(localPort, 'test\n')
        d = self.execute('', process,
                         sshArgs='-N -R %i:127.0.0.1:%i'
                         % (localPort, self.echoPort))
        d.addCallback(self.assertEqual, 'test\n')
        return d



class RekeyAvatar(ConchUser):
    """
    This avatar implements a shell which sends 60 numbered lines to whatever
    connects to it, then closes the session with a 0 exit status.

    60 lines is selected as being enough to send more than 2kB of traffic, the
    amount the client is configured to initiate a rekey after.
    """
    # Conventionally there is a separate adapter object which provides ISession
    # for the user, but making the user provide ISession directly works too.
    # This isn't a full implementation of ISession though, just enough to make
    # these tests pass.
    implements(ISession)

    def __init__(self):
        ConchUser.__init__(self)
        self.channelLookup['session'] = SSHSession


    def openShell(self, transport):
        """
        Write 60 lines of data to the transport, then exit.
        """
        proto = protocol.Protocol()
        proto.makeConnection(transport)
        transport.makeConnection(wrapProtocol(proto))

        # Send enough bytes to the connection so that a rekey is triggered in
        # the client.
        def write(counter):
            i = counter()
            if i == 60:
                call.stop()
                transport.session.conn.sendRequest(
                    transport.session, 'exit-status', '\x00\x00\x00\x00')
                transport.loseConnection()
            else:
                transport.write("line #%02d\n" % (i,))

        # The timing for this loop is an educated guess (and/or the result of
        # experimentation) to exercise the case where a packet is generated
        # mid-rekey.  Since the other side of the connection is (so far) the
        # OpenSSH command line client, there's no easy way to determine when the
        # rekey has been initiated.  If there were, then generating a packet
        # immediately at that time would be a better way to test the
        # functionality being tested here.
        call = LoopingCall(write, count().next)
        call.start(0.01)


    def closed(self):
        """
        Ignore the close of the session.
        """



class RekeyRealm:
    """
    This realm gives out new L{RekeyAvatar} instances for any avatar request.
    """
    def requestAvatar(self, avatarID, mind, *interfaces):
        return interfaces[0], RekeyAvatar(), lambda: None



class RekeyTestsMixin(ConchServerSetupMixin):
    """
    TestCase mixin which defines tests exercising L{SSHTransportBase}'s handling
    of rekeying messages.
    """
    realmFactory = RekeyRealm

    def test_clientRekey(self):
        """
        After a client-initiated rekey is completed, application data continues
        to be passed over the SSH connection.
        """
        process = ConchTestOpenSSHProcess()
        d = self.execute("", process, '-o RekeyLimit=2K')
        def finished(result):
            self.assertEqual(
                result,
                '\n'.join(['line #%02d' % (i,) for i in range(60)]) + '\n')
        d.addCallback(finished)
        return d



class OpenSSHClientMixin:
    if not which('ssh'):
        skip = "no ssh command-line client available"

    def execute(self, remoteCommand, process, sshArgs=''):
        """
        Connects to the SSH server started in L{ConchServerSetupMixin.setUp} by
        running the 'ssh' command line tool.

        @type remoteCommand: str
        @param remoteCommand: The command (with arguments) to run on the
        remote end.

        @type process: L{ConchTestOpenSSHProcess}

        @type sshArgs: str
        @param sshArgs: Arguments to pass to the 'ssh' process.

        @return: L{defer.Deferred}
        """
        process.deferred = defer.Deferred()
        cmdline = ('ssh -2 -l testuser -p %i '
                   '-oUserKnownHostsFile=kh_test '
                   '-oPasswordAuthentication=no '
                   # Always use the RSA key, since that's the one in kh_test.
                   '-oHostKeyAlgorithms=ssh-rsa '
                   '-a '
                   '-i dsa_test ') + sshArgs + \
                   ' 127.0.0.1 ' + remoteCommand
        port = self.conchServer.getHost().port
        cmds = (cmdline % port).split()
        reactor.spawnProcess(process, "ssh", cmds)
        return process.deferred



class OpenSSHClientForwardingTestCase(ForwardingMixin, OpenSSHClientMixin,
                                      unittest.TestCase):
    """
    Connection forwarding tests run against the OpenSSL command line client.
    """



class OpenSSHClientRekeyTestCase(RekeyTestsMixin, OpenSSHClientMixin,
                                 unittest.TestCase):
    """
    Rekeying tests run against the OpenSSL command line client.
    """



class CmdLineClientTestCase(ForwardingMixin, unittest.TestCase):
    """
    Connection forwarding tests run against the Conch command line client.
    """
    if runtime.platformType == 'win32':
        skip = "can't run cmdline client on win32"

    def execute(self, remoteCommand, process, sshArgs=''):
        """
        As for L{OpenSSHClientTestCase.execute}, except it runs the 'conch'
        command line tool, not 'ssh'.
        """
        process.deferred = defer.Deferred()
        port = self.conchServer.getHost().port
        cmd = ('-p %i -l testuser '
               '--known-hosts kh_test '
               '--user-authentications publickey '
               '--host-key-algorithms ssh-rsa '
               '-a '
               '-i dsa_test '
               '-v ') % port + sshArgs + \
               ' 127.0.0.1 ' + remoteCommand
        cmds = _makeArgs(cmd.split())
        log.msg(str(cmds))
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        reactor.spawnProcess(process, sys.executable, cmds, env=env)
        return process.deferred