/usr/lib/python3/dist-packages/tftp/backend.py is in python3-txtftp 0.1~bzr47-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | '''
@author: shylent
'''
from os import fstat
from tftp.errors import Unsupported, FileExists, AccessViolation, FileNotFound
from tftp.util import deferred
from twisted.python.filepath import FilePath, InsecurePath
import shutil
import tempfile
from zope import interface
class IBackend(interface.Interface):
"""An object, that manages interaction between the TFTP network protocol and
anything, where you can get files from or put files to (a filesystem).
"""
def get_reader(file_name):
"""Return an object, that provides L{IReader}, that was initialized with
the given L{file_name}.
@param file_name: file name, specified as part of a TFTP read request (RRQ)
@type file_name: C{bytes}
@raise Unsupported: if reading is not supported for this particular
backend instance
@raise AccessViolation: if the passed file_name is not acceptable for
security or access control reasons
@raise FileNotFound: if the file, that corresponds to the given C{file_name}
could not be found
@raise BackendError: for any other errors, that were encountered while
attempting to construct a reader
@return: a L{Deferred} that will fire with an L{IReader}
"""
def get_writer(file_name):
"""Return an object, that provides L{IWriter}, that was initialized with
the given L{file_name}.
@param file_name: file name, specified as part of a TFTP write request (WRQ)
@type file_name: C{bytes}
@raise Unsupported: if writing is not supported for this particular
backend instance
@raise AccessViolation: if the passed file_name is not acceptable for
security or access control reasons
@raise FileExists: if the file, that corresponds to the given C{file_name}
already exists and it is not desirable to overwrite it
@raise BackendError: for any other errors, that were encountered while
attempting to construct a writer
@return: a L{Deferred} that will fire with an L{IWriter}
"""
class IReader(interface.Interface):
"""An object, that performs reads on request of the TFTP protocol"""
size = interface.Attribute(
"The size of the file to be read, or C{None} if it's not known.")
def read(size):
"""Attempt to read C{size} number of bytes.
@note: If less, than C{size} bytes is returned, it is assumed, that there
is no more data to read and the TFTP transfer is terminated. This means, that
less, than C{size} bytes should be returned if and only if this read should
be the last read for this reader object.
@param size: a number of bytes to return to the protocol
@type size: C{int}
@return: data, that was read or a L{Deferred}, that will be fired with
the data, that was read.
@rtype: C{bytes} or L{Deferred}
"""
def finish():
"""Release the resources, that were acquired by this reader and make sure,
that no additional data will be returned.
"""
class IWriter(interface.Interface):
"""An object, that performs writes on request of the TFTP protocol"""
def write(data):
"""Attempt to write the data
@return: C{None} or a L{Deferred}, that will fire with C{None} (any errors,
that occured during the write will be available in an errback)
@rtype: C{NoneType} or L{Deferred}
"""
def finish():
"""Tell this writer, that there will be no more data and that the transfer
was successfully completed
"""
def cancel():
"""Tell this writer, that the transfer has ended unsuccessfully"""
@interface.implementer(IReader)
class FilesystemReader(object):
"""A reader to go with L{FilesystemSynchronousBackend}.
@see: L{IReader}
@param file_path: a path to file, that we will read from
@type file_path: L{FilePath<twisted.python.filepath.FilePath>}
@raise FileNotFound: if the file does not exist
"""
def __init__(self, file_path):
self.file_path = file_path
try:
self.file_obj = self.file_path.open('r')
except IOError:
raise FileNotFound(self.file_path)
self.state = 'active'
@property
def size(self):
"""
@see: L{IReader.size}
"""
if self.file_obj.closed:
return None
else:
return fstat(self.file_obj.fileno()).st_size
def read(self, size):
"""
@see: L{IReader.read}
@return: data, that was read
@rtype: C{bytes}
"""
if self.state in ('eof', 'finished'):
return b''
data = self.file_obj.read(size)
if not data:
self.state = 'eof'
self.file_obj.close()
return data
def finish(self):
"""
@see: L{IReader.finish}
"""
if self.state not in ('eof', 'finished'):
self.file_obj.close()
self.state = 'finished'
@interface.implementer(IWriter)
class FilesystemWriter(object):
"""A writer to go with L{FilesystemSynchronousBackend}.
This particular implementation actually writes to a temporary file. If the
transfer is completed successfully, contens of the target file are replaced
with the contents of the temporary file and the temporary file is removed.
If L{cancel} is called, both files are discarded.
@see: L{IWriter}
@param file_path: a path to file, that will be created and written to
@type file_path: L{FilePath<twisted.python.filepath.FilePath>}
@raise FileExists: if the file already exists
"""
def __init__(self, file_path):
if file_path.exists():
raise FileExists(file_path)
file_dir = file_path.parent()
if not file_dir.exists():
file_dir.makedirs()
self.file_path = file_path
self.destination_file = self.file_path.open('w')
self.temp_destination = tempfile.TemporaryFile()
self.state = 'active'
def write(self, data):
"""
@see: L{IWriter.write}
"""
self.temp_destination.write(data)
def finish(self):
"""
@see: L{IWriter.finish}
"""
if self.state not in ('finished', 'cancelled'):
self.temp_destination.seek(0)
shutil.copyfileobj(self.temp_destination, self.destination_file)
self.temp_destination.close()
self.destination_file.close()
self.state = 'finished'
def cancel(self):
"""
@see: L{IWriter.cancel}
"""
if self.state not in ('finished', 'cancelled'):
self.temp_destination.close()
self.destination_file.close()
self.file_path.remove()
self.state = 'cancelled'
@interface.implementer(IBackend)
class FilesystemSynchronousBackend(object):
"""A synchronous filesystem backend.
@see: L{IBackend}
@param base_path: the base filesystem path for this backend, any attempts to
read or write 'above' the specified path will be denied
@type base_path: C{bytes} or L{FilePath<twisted.python.filepath.FilePath>}
@param can_read: whether or not this backend should support reads
@type can_read: C{bool}
@param can_write: whether or not this backend should support writes
@type can_write: C{bool}
"""
def __init__(self, base_path, can_read=True, can_write=True):
try:
self.base = FilePath(base_path.path)
except AttributeError:
self.base = FilePath(base_path)
self.can_read, self.can_write = can_read, can_write
@deferred
def get_reader(self, file_name):
"""
@see: L{IBackend.get_reader}
@rtype: L{Deferred}, yielding a L{FilesystemReader}
"""
if not self.can_read:
raise Unsupported("Reading not supported")
try:
target_path = self.base.descendant(file_name.split(b"/"))
except InsecurePath as e:
raise AccessViolation("Insecure path: %s" % e)
return FilesystemReader(target_path)
@deferred
def get_writer(self, file_name):
"""
@see: L{IBackend.get_writer}
@rtype: L{Deferred}, yielding a L{FilesystemWriter}
"""
if not self.can_write:
raise Unsupported("Writing not supported")
try:
target_path = self.base.descendant(file_name.split(b"/"))
except InsecurePath as e:
raise AccessViolation("Insecure path: %s" % e)
return FilesystemWriter(target_path)
|