/usr/share/pyshared/sleekxmpp/test/livesocket.py is in python-sleekxmpp 1.0~beta5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import socket
import threading
try:
import queue
except ImportError:
import Queue as queue
class TestLiveSocket(object):
"""
A live test socket that reads and writes to queues in
addition to an actual networking socket.
Methods:
next_sent -- Return the next sent stanza.
next_recv -- Return the next received stanza.
recv_data -- Dummy method to have same interface as TestSocket.
recv -- Read the next stanza from the socket.
send -- Write a stanza to the socket.
makefile -- Dummy call, returns self.
read -- Read the next stanza from the socket.
"""
def __init__(self, *args, **kwargs):
"""
Create a new, live test socket.
Arguments:
Same as arguments for socket.socket
"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.recv_buffer = []
self.recv_queue = queue.Queue()
self.send_queue = queue.Queue()
self.send_queue_lock = threading.Lock()
self.recv_queue_lock = threading.Lock()
self.is_live = True
def __getattr__(self, name):
"""
Return attribute values of internal, live socket.
Arguments:
name -- Name of the attribute requested.
"""
return getattr(self.socket, name)
# ------------------------------------------------------------------
# Testing Interface
def disconnect_errror(self):
"""
Used to simulate a socket disconnection error.
Not used by live sockets.
"""
try:
self.socket.shutdown()
self.socket.close()
except:
pass
def next_sent(self, timeout=None):
"""
Get the next stanza that has been sent.
Arguments:
timeout -- Optional timeout for waiting for a new value.
"""
args = {'block': False}
if timeout is not None:
args = {'block': True, 'timeout': timeout}
try:
return self.send_queue.get(**args)
except:
return None
def next_recv(self, timeout=None):
"""
Get the next stanza that has been received.
Arguments:
timeout -- Optional timeout for waiting for a new value.
"""
args = {'block': False}
if timeout is not None:
args = {'block': True, 'timeout': timeout}
try:
if self.recv_buffer:
return self.recv_buffer.pop(0)
else:
return self.recv_queue.get(**args)
except:
return None
def recv_data(self, data):
"""
Add data to a receive buffer for cases when more than a single stanza
was received.
"""
self.recv_buffer.append(data)
# ------------------------------------------------------------------
# Socket Interface
def recv(self, *args, **kwargs):
"""
Read data from the socket.
Store a copy in the receive queue.
Arguments:
Placeholders. Same as for socket.recv.
"""
data = self.socket.recv(*args, **kwargs)
with self.recv_queue_lock:
self.recv_queue.put(data)
return data
def send(self, data):
"""
Send data on the socket.
Store a copy in the send queue.
Arguments:
data -- String value to write.
"""
with self.send_queue_lock:
self.send_queue.put(data)
self.socket.send(data)
# ------------------------------------------------------------------
# File Socket
def makefile(self, *args, **kwargs):
"""
File socket version to use with ElementTree.
Arguments:
Placeholders, same as socket.makefile()
"""
return self
def read(self, *args, **kwargs):
"""
Implement the file socket read interface.
Arguments:
Placeholders, same as socket.recv()
"""
return self.recv(*args, **kwargs)
def clear(self):
"""
Empty the send queue, typically done once the session has started to
remove the feature negotiation and log in stanzas.
"""
with self.send_queue_lock:
for i in range(0, self.send_queue.qsize()):
self.send_queue.get(block=False)
with self.recv_queue_lock:
for i in range(0, self.recv_queue.qsize()):
self.recv_queue.get(block=False)
|