/usr/share/doc/python-dpkt/examples/ping.py is in python-dpkt 1.8.r98-0.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | #!/usr/bin/env python
import math, optparse, random, socket, sys, time
import dpkt
class Ping(object):
def __init__(self):
usage = '%prog [OPTIONS] <host>'
self.op = optparse.OptionParser(usage=usage)
self.op.add_option('-c', dest='count', type='int', default=sys.maxint,
help='Total number of queries to send')
self.op.add_option('-i', dest='wait', type='float', default=1,
help='Specify packet interval timeout in seconds')
def gen_ping(self, opts):
pass
def open_sock(self, opts):
pass
def print_header(self, opts):
pass
def print_reply(self, opts, buf):
pass
def main(self, argv=None):
if not argv:
argv = sys.argv[1:]
opts, args = self.op.parse_args(argv)
if not args:
self.op.error('missing host')
elif len(args) > 1:
self.op.error('only one host may be specified')
host = args[0]
opts.ip = socket.gethostbyname(host)
sock = self.open_sock(opts)
sent = rcvd = rtt_max = rtt_sum = rtt_sumsq = 0
rtt_min = 0xffff
try:
self.print_header(opts)
for ping in self.gen_ping(opts):
try:
start = time.time()
sock.send(ping)
buf = sock.recv(0xffff)
rtt = time.time() - start
if rtt < rtt_min: rtt_min = rtt
if rtt > rtt_max: rtt_max = rtt
rtt_sum += rtt
rtt_sumsq += rtt * rtt
self.print_reply(opts, buf, rtt)
rcvd += 1
except socket.timeout:
pass
sent += 1
time.sleep(opts.wait)
except KeyboardInterrupt:
pass
print '\n--- %s ping statistics ---' % opts.ip
print '%d packets transmitted, %d packets received, %.1f%% packet loss' % \
(sent, rcvd, (float(sent - rcvd) / sent) * 100)
rtt_avg = rtt_sum / sent
if rtt_min == 0xffff: rtt_min = 0
print 'round-trip min/avg/max/std-dev = %.3f/%.3f/%.3f/%.3f ms' % \
(rtt_min * 1000, rtt_avg * 1000, rtt_max * 1000,
math.sqrt((rtt_sumsq / sent) - (rtt_avg * rtt_avg)) * 1000)
class ICMPPing(Ping):
def __init__(self):
Ping.__init__(self)
self.op.add_option('-p', dest='payload', type='string',
default='hello world!',
help='Echo payload string')
def open_sock(self, opts):
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
sock.connect((opts.ip, 1))
sock.settimeout(opts.wait)
return sock
def gen_ping(self, opts):
for i in xrange(opts.count):
icmp = dpkt.icmp.ICMP(
type=8, data=dpkt.icmp.ICMP.Echo(id=random.randint(0, 0xffff),
seq=i, data=opts.payload))
yield str(icmp)
def print_header(self, opts):
print 'PING %s: %d data bytes' % (opts.ip, len(opts.payload))
def print_reply(self, opts, buf, rtt):
ip = dpkt.ip.IP(buf)
if sys.platform == 'darwin':
# XXX - work around raw socket bug on MacOS X
ip.data = ip.icmp = dpkt.icmp.ICMP(buf[20:])
ip.len = len(ip.data)
print '%d bytes from %s: icmp_seq=%d ip_id=%d ttl=%d time=%.3f ms' % \
(len(ip.icmp), opts.ip, ip.icmp.echo.seq, ip.id, ip.ttl,
rtt * 1000)
if __name__ == '__main__':
p = ICMPPing()
p.main()
|