debug version
[svn/Prometheus-QoS/.git] / optional-tools / ping.py
CommitLineData
00902283 1#!/usr/bin/env python
2# coding: utf-8
3
4"""
5 A pure python ping implementation using raw sockets.
6
7 Note that ICMP messages can only be send from processes running as root
8 (in Windows, you must run this script as 'Administrator').
9
10 Bugs are naturally mine. I'd be glad to hear about them. There are
11 certainly word - size dependencies here.
12
13 :homepage: https://github.com/jedie/python-ping/
14 :copyleft: 1989-2011 by the python-ping team, see AUTHORS for more details.
15 :license: GNU GPL v2, see LICENSE for more details.
16"""
17
18
19import array
20import os
21import select
22import signal
23import socket
24import struct
25import sys
26import time
27
28
29if sys.platform.startswith("win32"):
30 # On Windows, the best timer is time.clock()
31 default_timer = time.clock
32else:
33 # On most other platforms the best timer is time.time()
34 default_timer = time.time
35
36
37# ICMP parameters
38ICMP_ECHOREPLY = 0 # Echo reply (per RFC792)
39ICMP_ECHO = 8 # Echo request (per RFC792)
40ICMP_MAX_RECV = 2048 # Max size of incoming buffer
41
42MAX_SLEEP = 1000
43
44
45def calculate_checksum(source_string):
46 """
47 A port of the functionality of in_cksum() from ping.c
48 Ideally this would act on the string as a series of 16-bit ints (host
49 packed), but this works.
50 Network data is big-endian, hosts are typically little-endian
51 """
52 if len(source_string)%2:
53 source_string += "\x00"
54 converted = array.array("H", source_string)
55 if sys.byteorder == "big":
56 converted.byteswap()
57 val = sum(converted)
58
59 val &= 0xffffffff # Truncate val to 32 bits (a variance from ping.c, which
60 # uses signed ints, but overflow is unlikely in ping)
61
62 val = (val >> 16) + (val & 0xffff) # Add high 16 bits to low 16 bits
63 val += (val >> 16) # Add carry from above (if any)
64 answer = ~val & 0xffff # Invert and truncate to 16 bits
65 answer = socket.htons(answer)
66
67 return answer
68
69
70def is_valid_ip4_address(addr):
71 parts = addr.split(".")
72 if not len(parts) == 4:
73 return False
74 for part in parts:
75 try:
76 number = int(part)
77 except ValueError:
78 return False
79 if number > 255:
80 return False
81 return True
82
83def to_ip(addr):
84 if is_valid_ip4_address(addr):
85 return addr
86 return socket.gethostbyname(addr)
87
88
89class Ping(object):
90 def __init__(self, destination, timeout=1000, packet_size=55, own_id=None):
91 self.destination = destination
92 self.timeout = timeout
93 self.packet_size = packet_size
94 if own_id is None:
95 self.own_id = os.getpid() & 0xFFFF
96 else:
97 self.own_id = own_id
98
99 try:
100 # FIXME: Use destination only for display this line here? see: https://github.com/jedie/python-ping/issues/3
101 self.dest_ip = to_ip(self.destination)
102 except socket.gaierror as e:
103 self.print_unknown_host(e)
104# else:
105# self.print_start()
106
107 self.seq_number = 0
108 self.send_count = 0
109 self.receive_count = 0
110 self.min_time = 999999999
111 self.max_time = 0.0
112 self.total_time = 0.0
113
114 #--------------------------------------------------------------------------
115
116 def print_start(self):
117 print("\nPYTHON-PING %s (%s): %d data bytes" % (self.destination, self.dest_ip, self.packet_size))
118
119 def print_unknown_host(self, e):
120 print("\nPYTHON-PING: Unknown host: %s (%s)\n" % (self.destination, e.args[1]))
121 sys.exit(-1)
122
123 def print_success(self, delay, ip, packet_size, ip_header, icmp_header):
124 if ip == self.destination:
125 from_info = ip
126 else:
127 from_info = "%s (%s)" % (self.destination, ip)
128
129 print("%d bytes from %s: icmp_seq=%d ttl=%d time=%.1f ms" % (
130 packet_size, from_info, icmp_header["seq_number"], ip_header["ttl"], delay)
131 )
132 #print("IP header: %r" % ip_header)
133 #print("ICMP header: %r" % icmp_header)
134
135 def print_failed(self):
136 print("Request timed out.")
137
138 def print_exit(self):
139 print("\n----%s PYTHON PING Statistics----" % (self.destination))
140
141 lost_count = self.send_count - self.receive_count
142 #print("%i packets lost" % lost_count)
143 lost_rate = float(lost_count) / self.send_count * 100.0
144
145 print("%d packets transmitted, %d packets received, %0.1f%% packet loss" % (
146 self.send_count, self.receive_count, lost_rate
147 ))
148
149 if self.receive_count > 0:
150 print("round-trip (ms) min/avg/max = %0.3f/%0.3f/%0.3f" % (
151 self.min_time, self.total_time / self.receive_count, self.max_time
152 ))
153
154 print("")
155
156 #--------------------------------------------------------------------------
157
158 def signal_handler(self, signum, frame):
159 """
160 Handle print_exit via signals
161 """
162 self.print_exit()
163 print("\n(Terminated with signal %d)\n" % (signum))
164 sys.exit(0)
165
166 def setup_signal_handler(self):
167 signal.signal(signal.SIGINT, self.signal_handler) # Handle Ctrl-C
168 if hasattr(signal, "SIGBREAK"):
169 # Handle Ctrl-Break e.g. under Windows
170 signal.signal(signal.SIGBREAK, self.signal_handler)
171
172 #--------------------------------------------------------------------------
173
174 def header2dict(self, names, struct_format, data):
175 """ unpack the raw received IP and ICMP header informations to a dict """
176 unpacked_data = struct.unpack(struct_format, data)
177 return dict(zip(names, unpacked_data))
178
179 #--------------------------------------------------------------------------
180
181 def run(self, count=None, deadline=None):
182 """
183 send and receive pings in a loop. Stop if count or until deadline.
184 """
185 self.setup_signal_handler()
186
187 while True:
188 delay = self.do()
189
190 self.seq_number += 1
191 if count and self.seq_number >= count:
192 break
193 if deadline and self.total_time >= deadline:
194 break
195
196 if delay == None:
197 delay = 0
198
199 # Pause for the remainder of the MAX_SLEEP period (if applicable)
200 if (MAX_SLEEP > delay):
201 time.sleep((MAX_SLEEP - delay) / 1000.0)
202
203 self.print_exit()
204
205 def do(self):
206 """
207 Send one ICMP ECHO_REQUEST and receive the response until self.timeout
208 """
209 try: # One could use UDP here, but it's obscure
210 current_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
211 except socket.error, (errno, msg):
212 if errno == 1:
213 # Operation not permitted - Add more information to traceback
214 etype, evalue, etb = sys.exc_info()
215 evalue = etype(
216 "%s - Note that ICMP messages can only be send from processes running as root." % evalue
217 )
218 raise etype, evalue, etb
219 raise # raise the original error
220
221 send_time = self.send_one_ping(current_socket)
222 if send_time == None:
223 return
224 self.send_count += 1
225
226 receive_time, packet_size, ip, ip_header, icmp_header = self.receive_one_ping(current_socket)
227 current_socket.close()
228
229 if receive_time:
230 self.receive_count += 1
231 delay = (receive_time - send_time) * 1000.0
232 self.total_time += delay
233 if self.min_time > delay:
234 self.min_time = delay
235 if self.max_time < delay:
236 self.max_time = delay
237
238# self.print_success(delay, ip, packet_size, ip_header, icmp_header)
239 return delay
240# else:
241# self.print_failed()
242
243 def send_one_ping(self, current_socket):
244 """
245 Send one ICMP ECHO_REQUEST
246 """
247 # Header is type (8), code (8), checksum (16), id (16), sequence (16)
248 checksum = 0
249
250 # Make a dummy header with a 0 checksum.
251 header = struct.pack(
252 "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number
253 )
254
255 padBytes = []
256 startVal = 0x42
257 for i in range(startVal, startVal + (self.packet_size)):
258 padBytes += [(i & 0xff)] # Keep chars in the 0-255 range
259 data = bytes(padBytes)
260
261 # Calculate the checksum on the data and the dummy header.
262 checksum = calculate_checksum(header + data) # Checksum is in network order
263
264 # Now that we have the right checksum, we put that in. It's just easier
265 # to make up a new header than to stuff it into the dummy.
266 header = struct.pack(
267 "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number
268 )
269
270 packet = header + data
271
272 send_time = default_timer()
273
274 try:
275 current_socket.sendto(packet, (self.destination, 1)) # Port number is irrelevant for ICMP
276 except socket.error as e:
277 print("General failure (%s)" % (e.args[1]))
278 current_socket.close()
279 return
280
281 return send_time
282
283 def receive_one_ping(self, current_socket):
284 """
285 Receive the ping from the socket. timeout = in ms
286 """
287 timeout = self.timeout / 1000.0
288
289 while True: # Loop while waiting for packet or timeout
290 select_start = default_timer()
291 inputready, outputready, exceptready = select.select([current_socket], [], [], timeout)
292 select_duration = (default_timer() - select_start)
293 if inputready == []: # timeout
294 return None, 0, 0, 0, 0
295
296 receive_time = default_timer()
297
298 packet_data, address = current_socket.recvfrom(ICMP_MAX_RECV)
299
300 icmp_header = self.header2dict(
301 names=[
302 "type", "code", "checksum",
303 "packet_id", "seq_number"
304 ],
305 struct_format="!BBHHH",
306 data=packet_data[20:28]
307 )
308
309 if icmp_header["packet_id"] == self.own_id: # Our packet
310 ip_header = self.header2dict(
311 names=[
312 "version", "type", "length",
313 "id", "flags", "ttl", "protocol",
314 "checksum", "src_ip", "dest_ip"
315 ],
316 struct_format="!BBHHHBBHII",
317 data=packet_data[:20]
318 )
319 packet_size = len(packet_data) - 28
320 ip = socket.inet_ntoa(struct.pack("!I", ip_header["src_ip"]))
321 # XXX: Why not ip = address[0] ???
322 return receive_time, packet_size, ip, ip_header, icmp_header
323
324 timeout = timeout - select_duration
325 if timeout <= 0:
326 return None, 0, 0, 0, 0
327
328
329def verbose_ping(hostname, timeout=1000, count=3, packet_size=55):
330 p = Ping(hostname, timeout, packet_size)
331 p.run(count)
332
333
334if __name__ == '__main__':
335 # FIXME: Add a real CLI
336 if len(sys.argv) == 1:
337 print "DEMO"
338
339 # These should work:
340 verbose_ping("heise.de")
341 verbose_ping("google.com")
342
343 # Inconsistent on Windows w/ ActivePython (Python 3.2 resolves correctly
344 # to the local host, but 2.7 tries to resolve to the local *gateway*)
345 verbose_ping("localhost")
346
347 # Should fail with 'getaddrinfo print_failed':
348 verbose_ping("foobar_url.foobar")
349
350 # Should fail (timeout), but it depends on the local network:
351 verbose_ping("192.168.255.254")
352
353 # Should fails with 'The requested address is not valid in its context':
354 verbose_ping("0.0.0.0")
355 elif len(sys.argv) == 2:
356 verbose_ping(sys.argv[1])
357 else:
358 print "Error: call ./ping.py domain.tld"
This page took 0.337408 seconds and 4 git commands to generate.