/usr/share/pyshared/tlslite/FileObject.py is in python-tlslite 0.3.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | """Class returned by TLSConnection.makefile()."""
class FileObject:
"""This class provides a file object interface to a
L{tlslite.TLSConnection.TLSConnection}.
Call makefile() on a TLSConnection to create a FileObject instance.
This class was copied, with minor modifications, from the
_fileobject class in socket.py. Note that fileno() is not
implemented."""
default_bufsize = 16384 #TREV: changed from 8192
def __init__(self, sock, mode='rb', bufsize=-1):
self._sock = sock
self.mode = mode # Not actually used in this version
if bufsize < 0:
bufsize = self.default_bufsize
self.bufsize = bufsize
self.softspace = False
if bufsize == 0:
self._rbufsize = 1
elif bufsize == 1:
self._rbufsize = self.default_bufsize
else:
self._rbufsize = bufsize
self._wbufsize = bufsize
self._rbuf = "" # A string
self._wbuf = [] # A list of strings
def _getclosed(self):
return self._sock is not None
closed = property(_getclosed, doc="True if the file is closed")
def close(self):
try:
if self._sock:
for result in self._sock._decrefAsync(): #TREV
pass
finally:
self._sock = None
def __del__(self):
try:
self.close()
except:
# close() may fail if __init__ didn't complete
pass
def flush(self):
if self._wbuf:
buffer = "".join(self._wbuf)
self._wbuf = []
self._sock.sendall(buffer)
#def fileno(self):
# raise NotImplementedError() #TREV
def write(self, data):
data = str(data) # XXX Should really reject non-string non-buffers
if not data:
return
self._wbuf.append(data)
if (self._wbufsize == 0 or
self._wbufsize == 1 and '\n' in data or
self._get_wbuf_len() >= self._wbufsize):
self.flush()
def writelines(self, list):
# XXX We could do better here for very long lists
# XXX Should really reject non-string non-buffers
self._wbuf.extend(filter(None, map(str, list)))
if (self._wbufsize <= 1 or
self._get_wbuf_len() >= self._wbufsize):
self.flush()
def _get_wbuf_len(self):
buf_len = 0
for x in self._wbuf:
buf_len += len(x)
return buf_len
def read(self, size=-1):
data = self._rbuf
if size < 0:
# Read until EOF
buffers = []
if data:
buffers.append(data)
self._rbuf = ""
if self._rbufsize <= 1:
recv_size = self.default_bufsize
else:
recv_size = self._rbufsize
while True:
data = self._sock.recv(recv_size)
if not data:
break
buffers.append(data)
return "".join(buffers)
else:
# Read until size bytes or EOF seen, whichever comes first
buf_len = len(data)
if buf_len >= size:
self._rbuf = data[size:]
return data[:size]
buffers = []
if data:
buffers.append(data)
self._rbuf = ""
while True:
left = size - buf_len
recv_size = max(self._rbufsize, left)
data = self._sock.recv(recv_size)
if not data:
break
buffers.append(data)
n = len(data)
if n >= left:
self._rbuf = data[left:]
buffers[-1] = data[:left]
break
buf_len += n
return "".join(buffers)
def readline(self, size=-1):
data = self._rbuf
if size < 0:
# Read until \n or EOF, whichever comes first
if self._rbufsize <= 1:
# Speed up unbuffered case
assert data == ""
buffers = []
recv = self._sock.recv
while data != "\n":
data = recv(1)
if not data:
break
buffers.append(data)
return "".join(buffers)
nl = data.find('\n')
if nl >= 0:
nl += 1
self._rbuf = data[nl:]
return data[:nl]
buffers = []
if data:
buffers.append(data)
self._rbuf = ""
while True:
data = self._sock.recv(self._rbufsize)
if not data:
break
buffers.append(data)
nl = data.find('\n')
if nl >= 0:
nl += 1
self._rbuf = data[nl:]
buffers[-1] = data[:nl]
break
return "".join(buffers)
else:
# Read until size bytes or \n or EOF seen, whichever comes first
nl = data.find('\n', 0, size)
if nl >= 0:
nl += 1
self._rbuf = data[nl:]
return data[:nl]
buf_len = len(data)
if buf_len >= size:
self._rbuf = data[size:]
return data[:size]
buffers = []
if data:
buffers.append(data)
self._rbuf = ""
while True:
data = self._sock.recv(self._rbufsize)
if not data:
break
buffers.append(data)
left = size - buf_len
nl = data.find('\n', 0, left)
if nl >= 0:
nl += 1
self._rbuf = data[nl:]
buffers[-1] = data[:nl]
break
n = len(data)
if n >= left:
self._rbuf = data[left:]
buffers[-1] = data[:left]
break
buf_len += n
return "".join(buffers)
def readlines(self, sizehint=0):
total = 0
list = []
while True:
line = self.readline()
if not line:
break
list.append(line)
total += len(line)
if sizehint and total >= sizehint:
break
return list
# Iterator protocols
def __iter__(self):
return self
def next(self):
line = self.readline()
if not line:
raise StopIteration
return line
|