import ipaddress import os import socket import struct import sys """ Структура IP пакета: Заголовок состоит из 5 слов (каждое слово 32 бита) Слово 1 (слева направо): Version (4 бита) - версия IP протоколо, в IPv4 = 4 Header size (IHL) (4 бита) - размер заголовка в словах Type of service (8 бит) состоит из полей: DSCP (6 бит) разделяет трафик на классы обслуживания ECN (2 бита) указатель перегрузки Total lenght (16 бит) - полный размер IP пакета (заголовок + данные) Слово 2 ID (16 бит) - используется для сборки фрагментированных пакетов Flags (3 бита) - используюся для фрагментации пакетов Fragment Offset (13 бит) - поле смещения фрагмента Слово 3 TTL - time to live (8 бит) - время жизни пакета (число транзитных узлов которое может пройти пакет перед тем как будет уничтожен). Если пришел пакет с ttl=1, то такой пакет не будет передан следующему узлу. Но если пакет пришел получателю, то такой пакет будет обработан. Protocol (8 бит) - код протокола, помещенного в IP пакет Header Checksum (16 бит) - контрольная сумма заголовка (меняется от узла к узлу, т.к. меняется TTL). Пакеты с неверной CRC отбрасываются. Слово 4 Source address (32 бита) - IP-адрес отпровителя Слово 5 Distination address (32 бита) - IP-адрес назначения Поле данных. Таким образом, максимальная длика IP макета 65535, а максимальная длина поля данных 65515. 20 байт уходит на заголовок. """ # class IP: class IP: def __init__(self, buff): header = struct.unpack('> 4 # Вер self.ihl = header[0] & 0xF self.tos = header[1] self.len = header[2] self.id = header[3] self.offset = header[4] self.ttl = header[5] self.protocol_num = header[6] self.sum = header[7] self.src = header[8] self.dst = header[9] # IP-адруса, понятные человек self.src_address = ipaddress.ip_address(self.src) self.dst_address = ipaddress.ip_address(self.dst) # сопоставляем константы протоколов с их назначением self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} try: self.protocol = self.protocol_map[self.protocol_num] except Exception as e: print('%s No protocol for %s' % (e, self.protocol_num)) self.protocol = str(self.protocol_num) def sniff(host): # создаем сырой сокет и привязываем к общедоступному интерфейсу if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protokol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) # делаем так, чтобы захватывался IP-заголовок sniffer.setsockopt(socket.IPPROTO_IP, socket.RCVALL_ON, 1) if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: # читаем пакет raw_buffer = sniffer.recvfrom(65535)[0] # создаем IP-заголовок из первый 20 байтов ip_header = IP(raw_buffer[0:20]) # выводим обнаруженный протокол и адреса print("Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)) except KeyboardInterrupt: # если мы в Windows, выключаем неизбирательный режим if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) sys.exit() if __name__ == '__main__': if len(sys.argv) == 2: host = sys.argv[1] sniff(host)