This file is indexed.

/usr/share/pyshared/sleekxmpp/test/mocksocket.py is in python-sleekxmpp 1.0~beta5-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
    SleekXMPP: The Sleek XMPP Library
    Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of SleekXMPP.

    See the file LICENSE for copying permission.
"""

import socket
try:
    import queue
except ImportError:
    import Queue as queue


class TestSocket(object):

    """
    A dummy socket that reads and writes to queues instead
    of an actual networking socket.

    Methods:
        next_sent -- Return the next sent stanza.
        recv_data -- Make a stanza available to read next.
        recv      -- Read the next stanza from the socket.
        send      -- Write a stanza to the socket.
        makefile  -- Dummy call, returns self.
        read      -- Read the next stanza from the socket.
    """

    def __init__(self, *args, **kwargs):
        """
        Create a new test socket.

        Arguments:
            Same as arguments for socket.socket
        """
        self.socket = socket.socket(*args, **kwargs)
        self.recv_queue = queue.Queue()
        self.send_queue = queue.Queue()
        self.is_live = False
        self.disconnected = False

    def __getattr__(self, name):
        """
        Return attribute values of internal, dummy socket.

        Some attributes and methods are disabled to prevent the
        socket from connecting to the network.

        Arguments:
            name -- Name of the attribute requested.
        """

        def dummy(*args):
            """Method to do nothing and prevent actual socket connections."""
            return None

        overrides = {'connect': dummy,
                     'close': dummy,
                     'shutdown': dummy}

        return overrides.get(name, getattr(self.socket, name))

    # ------------------------------------------------------------------
    # Testing Interface

    def next_sent(self, timeout=None):
        """
        Get the next stanza that has been 'sent'.

        Arguments:
            timeout -- Optional timeout for waiting for a new value.
        """
        args = {'block': False}
        if timeout is not None:
            args = {'block': True, 'timeout': timeout}
        try:
            return self.send_queue.get(**args)
        except:
            return None

    def recv_data(self, data):
        """
        Add data to the receiving queue.

        Arguments:
            data -- String data to 'write' to the socket to be received
                    by the XMPP client.
        """
        self.recv_queue.put(data)

    def disconnect_error(self):
        """
        Simulate a disconnect error by raising a socket.error exception
        for any current or further socket operations.
        """
        self.disconnected = True

    # ------------------------------------------------------------------
    # Socket Interface

    def recv(self, *args, **kwargs):
        """
        Read a value from the received queue.

        Arguments:
            Placeholders. Same as for socket.Socket.recv.
        """
        if self.disconnected:
            raise socket.error
        return self.read(block=True)

    def send(self, data):
        """
        Send data by placing it in the send queue.

        Arguments:
            data -- String value to write.
        """
        if self.disconnected:
            raise socket.error
        self.send_queue.put(data)

    # ------------------------------------------------------------------
    # File Socket

    def makefile(self, *args, **kwargs):
        """
        File socket version to use with ElementTree.

        Arguments:
            Placeholders, same as socket.Socket.makefile()
        """
        return self

    def read(self, block=True, timeout=None, **kwargs):
        """
        Implement the file socket interface.

        Arguments:
            block   -- Indicate if the read should block until a
                       value is ready.
            timeout -- Time in seconds a block should last before
                       returning None.
        """
        if self.disconnected:
            raise socket.error
        if timeout is not None:
            block = True
        try:
            return self.recv_queue.get(block, timeout)
        except:
            return None