/usr/share/pyshared/tftp/datagram.py is in python-txtftp 0.1~bzr38-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 | '''
@author: shylent
'''
from itertools import chain
from tftp.errors import (WireProtocolError, InvalidOpcodeError,
PayloadDecodeError, InvalidErrorcodeError, OptionsDecodeError)
from twisted.python.util import OrderedDict
import struct
OP_RRQ = 1
OP_WRQ = 2
OP_DATA = 3
OP_ACK = 4
OP_ERROR = 5
OP_OACK = 6
ERR_NOT_DEFINED = 0
ERR_FILE_NOT_FOUND = 1
ERR_ACCESS_VIOLATION = 2
ERR_DISK_FULL = 3
ERR_ILLEGAL_OP = 4
ERR_TID_UNKNOWN = 5
ERR_FILE_EXISTS = 6
ERR_NO_SUCH_USER = 7
errors = {
ERR_NOT_DEFINED : "",
ERR_FILE_NOT_FOUND : "File not found",
ERR_ACCESS_VIOLATION : "Access violation",
ERR_DISK_FULL : "Disk full or allocation exceeded",
ERR_ILLEGAL_OP : "Illegal TFTP operation",
ERR_TID_UNKNOWN : "Unknown transfer ID",
ERR_FILE_EXISTS : "File already exists",
ERR_NO_SUCH_USER : "No such user"
}
def split_opcode(datagram):
"""Split the raw datagram into opcode and payload.
@param datagram: raw datagram
@type datagram: C{str}
@return: a 2-tuple, the first item is the opcode and the second item is the payload
@rtype: (C{int}, C{str})
@raise WireProtocolError: if the opcode cannot be extracted
"""
try:
return struct.unpack("!H", datagram[:2])[0], datagram[2:]
except struct.error:
raise WireProtocolError("Failed to extract the opcode")
class TFTPDatagram(object):
"""Base class for datagrams
@cvar opcode: The opcode, corresponding to this datagram
@type opcode: C{int}
"""
opcode = None
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return a datagram object
@param payload: Binary representation of the payload (without the opcode)
@type payload: C{str}
"""
raise NotImplementedError("Subclasses must override this")
def to_wire(self):
"""Return the wire representation of the datagram.
@rtype: C{str}
"""
raise NotImplementedError("Subclasses must override this")
class RQDatagram(TFTPDatagram):
"""Base class for "RQ" (request) datagrams.
@ivar filename: File name, that corresponds to this request.
@type filename: C{str}
@ivar mode: Transfer mode. Valid values are C{netascii} and C{octet}.
Case-insensitive.
@type mode: C{str}
@ivar options: Any options, that were requested by the client (as per
U{RFC2374<http://tools.ietf.org/html/rfc2347>}
@type options: C{dict}
"""
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return a RRQ/WRQ datagram object.
@return: datagram object
@rtype: L{RRQDatagram} or L{WRQDatagram}
@raise OptionsDecodeError: if we failed to decode the options, requested
by the client
@raise PayloadDecodeError: if there were not enough fields in the payload.
Fields are terminated by NUL.
"""
parts = payload.split('\x00')
try:
filename, mode = parts.pop(0), parts.pop(0)
except IndexError:
raise PayloadDecodeError("Not enough fields in the payload")
if parts and not parts[-1]:
parts.pop(-1)
options = OrderedDict()
# To maintain consistency during testing.
# The actual order of options is not important as per RFC2347
if len(parts) % 2:
raise OptionsDecodeError("No value for option %s" % parts[-1])
for ind, opt_name in enumerate(parts[::2]):
if opt_name in options:
raise OptionsDecodeError("Duplicate option specified: %s" % opt_name)
options[opt_name] = parts[ind * 2 + 1]
return cls(filename, mode, options)
def __init__(self, filename, mode, options):
self.filename = filename
self.mode = mode.lower()
self.options = options
def __repr__(self):
if self.options:
return ("<%s(filename=%s, mode=%s, options=%s)>" %
(self.__class__.__name__, self.filename, self.mode, self.options))
return "<%s(filename=%s, mode=%s)>" % (self.__class__.__name__,
self.filename, self.mode)
def to_wire(self):
opcode = struct.pack("!H", self.opcode)
if self.options:
options = '\x00'.join(chain.from_iterable(self.options.iteritems()))
return ''.join((opcode, self.filename, '\x00', self.mode, '\x00',
options, '\x00'))
else:
return ''.join((opcode, self.filename, '\x00', self.mode, '\x00'))
class RRQDatagram(RQDatagram):
opcode = OP_RRQ
class WRQDatagram(RQDatagram):
opcode = OP_WRQ
class OACKDatagram(TFTPDatagram):
"""An OACK datagram
@ivar options: Any options, that were requested by the client (as per
U{RFC2374<http://tools.ietf.org/html/rfc2347>}
@type options: C{dict}
"""
opcode = OP_OACK
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return an OACK datagram object.
@return: datagram object
@rtype: L{OACKDatagram}
@raise OptionsDecodeError: if we failed to decode the options
"""
parts = payload.split('\x00')
#FIXME: Boo, code duplication
if parts and not parts[-1]:
parts.pop(-1)
options = OrderedDict()
if len(parts) % 2:
raise OptionsDecodeError("No value for option %s" % parts[-1])
for ind, opt_name in enumerate(parts[::2]):
if opt_name in options:
raise OptionsDecodeError("Duplicate option specified: %s" % opt_name)
options[opt_name] = parts[ind * 2 + 1]
return cls(options)
def __init__(self, options):
self.options = options
def __repr__(self):
return ("<%s(options=%s)>" % (self.__class__.__name__, self.options))
def to_wire(self):
opcode = struct.pack("!H", self.opcode)
if self.options:
options = '\x00'.join(chain.from_iterable(self.options.iteritems()))
return ''.join((opcode, options, '\x00'))
else:
return opcode
class DATADatagram(TFTPDatagram):
"""A DATA datagram
@ivar blocknum: A block number, that this chunk of data is associated with
@type blocknum: C{int}
@ivar data: binary data
@type data: C{str}
"""
opcode = OP_DATA
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return a L{DATADatagram} object.
@param payload: Binary representation of the payload (without the opcode)
@type payload: C{str}
@return: A L{DATADatagram} object
@rtype: L{DATADatagram}
@raise PayloadDecodeError: if the format of payload is incorrect
"""
try:
blocknum, data = struct.unpack('!H', payload[:2])[0], payload[2:]
except struct.error:
raise PayloadDecodeError()
return cls(blocknum, data)
def __init__(self, blocknum, data):
self.blocknum = blocknum
self.data = data
def __repr__(self):
return "<%s(blocknum=%s, %s bytes of data)>" % (self.__class__.__name__,
self.blocknum, len(self.data))
def to_wire(self):
return ''.join((struct.pack('!HH', self.opcode, self.blocknum), self.data))
class ACKDatagram(TFTPDatagram):
"""An ACK datagram.
@ivar blocknum: Block number of the data chunk, which this datagram is supposed to acknowledge
@type blocknum: C{int}
"""
opcode = OP_ACK
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return a L{ACKDatagram} object.
@param payload: Binary representation of the payload (without the opcode)
@type payload: C{str}
@return: An L{ACKDatagram} object
@rtype: L{ACKDatagram}
@raise PayloadDecodeError: if the format of payload is incorrect
"""
try:
blocknum = struct.unpack('!H', payload)[0]
except struct.error:
raise PayloadDecodeError("Unable to extract the block number")
return cls(blocknum)
def __init__(self, blocknum):
self.blocknum = blocknum
def __repr__(self):
return "<%s(blocknum=%s)>" % (self.__class__.__name__, self.blocknum)
def to_wire(self):
return struct.pack('!HH', self.opcode, self.blocknum)
class ERRORDatagram(TFTPDatagram):
"""An ERROR datagram.
@ivar errorcode: A valid TFTP error code
@type errorcode: C{int}
@ivar errmsg: An error message, describing the error condition in which this
datagram was produced
@type errmsg: C{str}
"""
opcode = OP_ERROR
@classmethod
def from_wire(cls, payload):
"""Parse the payload and return a L{ERRORDatagram} object.
This method violates the standard a bit - if the error string was not
extracted, a default error string is generated, based on the error code.
@param payload: Binary representation of the payload (without the opcode)
@type payload: C{str}
@return: An L{ERRORDatagram} object
@rtype: L{ERRORDatagram}
@raise PayloadDecodeError: if the format of payload is incorrect
@raise InvalidErrorcodeError: a more specific exception, that is raised
if the error code was successfully, extracted, but it does not correspond
to any known/standartized error code values.
"""
try:
errorcode = struct.unpack('!H', payload[:2])[0]
except struct.error:
raise PayloadDecodeError("Unable to extract the error code")
if not errorcode in errors:
raise InvalidErrorcodeError(errorcode)
errmsg = payload[2:].split('\x00')[0]
if not errmsg:
errmsg = errors[errorcode]
return cls(errorcode, errmsg)
@classmethod
def from_code(cls, errorcode, errmsg=None):
"""Create an L{ERRORDatagram}, given an error code and, optionally, an
error message to go with it. If not provided, default error message for
the given error code is used.
@param errorcode: An error code (one of L{errors})
@type errorcode: C{int}
@param errmsg: An error message (optional)
@type errmsg: C{str} or C{NoneType}
@raise InvalidErrorcodeError: if the error code is not known
@return: an L{ERRORDatagram}
@rtype: L{ERRORDatagram}
"""
if not errorcode in errors:
raise InvalidErrorcodeError(errorcode)
if errmsg is None:
errmsg = errors[errorcode]
return cls(errorcode, errmsg)
def __init__(self, errorcode, errmsg):
self.errorcode = errorcode
self.errmsg = errmsg
def to_wire(self):
return ''.join((struct.pack('!HH', self.opcode, self.errorcode),
self.errmsg, '\x00'))
class _TFTPDatagramFactory(object):
"""Encapsulates the creation of datagrams based on the opcode"""
_dgram_classes = {
OP_RRQ: RRQDatagram,
OP_WRQ: WRQDatagram,
OP_DATA: DATADatagram,
OP_ACK: ACKDatagram,
OP_ERROR: ERRORDatagram,
OP_OACK: OACKDatagram
}
def __call__(self, opcode, payload):
"""Create a datagram, given an opcode and payload.
Errors, that occur during datagram creation are propagated as-is.
@param opcode: opcode
@type opcode: C{int}
@param payload: payload
@type payload: C{str}
@return: datagram object
@rtype: L{TFTPDatagram}
@raise InvalidOpcodeError: if the opcode is not recognized
"""
try:
datagram_class = self._dgram_classes[opcode]
except KeyError:
raise InvalidOpcodeError(opcode)
return datagram_class.from_wire(payload)
TFTPDatagramFactory = _TFTPDatagramFactory()
|