Browse Source

add packer

dtelenkov 2 days ago
parent
commit
7802284dc6

+ 39 - 0
python_net/packer/misc/ethernet_frame.py

@@ -0,0 +1,39 @@
+import socket
+import sys
+import net_params
+
+
+net = net_params.LocalNetwork()
+interface = net.get_default_interface()
+
+
+ethernet_frame = bytearray([
+    0xff, 0xff, 0xff, 0xff, 0xff, 0xff,  # Destination MAC (broadcast)
+    0x00, 0x11, 0x22, 0x33, 0x44, 0x55,  # Source MAC
+    0x12, 0x34,                          # Custom EtherType (0x1234)
+    # Payload (just some test data)
+    0xde, 0xad, 0xbe, 0xef                # Example payload
+])
+
+
+#  00:11:22:33:44:55
+
+def send_raw_frame():
+
+    with socket.socket(socket.AF_PACKET, socket.SOCK_RAW) as raw_socket:
+        print(raw_socket)
+        print(f"Binding to interface: {interface}")
+        raw_socket.bind((interface, socket.ETH_P_ALL))
+        print("Sending raw packet")
+        bytes_send = raw_socket.send(ethernet_frame)
+        print(f"Send {bytes_send} bytes to the interface")
+
+
+
+
+def main():
+    
+    send_raw_frame()
+
+if __name__ == '__main__':
+    main()

+ 69 - 0
python_net/packer/misc/icmp.py

@@ -0,0 +1,69 @@
+import struct
+import socket
+import select
+import random
+
+ICMP_CODE = socket.getprotobyname('icmp')
+
+
+def checksum(source_string):
+    sum = 0
+    count_to = (len(source_string) / 2) * 2
+    count = 0
+    while count < count_to:
+        this_val = source_string[count + 1]*256+source_string[count]
+        sum = sum + this_val
+        sum = sum & 0xffffffff
+        count = count + 2
+    if count_to < len(source_string):
+        sum = sum + source_string[len(source_string) - 1]
+        sum = sum & 0xffffffff
+    sum = (sum >> 16) + (sum & 0xffff)
+    sum = sum + (sum >> 16)
+    answer = ~sum
+    answer = answer & 0xffff
+    answer = answer >> 8 | (answer << 8 & 0xff00)
+    return answer
+
+def create_packet(id, data):
+    ICMP_ECHO_REQUEST = 8 #Код типа ICMP - в нашем случае ECHO
+    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1)
+    data = data
+    my_checksum = checksum(header + data)
+    header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1)
+    return header + data
+
+
+def send(dest_addr,data):
+    my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE)
+    host = socket.gethostbyname(dest_addr)
+
+    packet_id = random.randint(0,65535)
+    packet = create_packet(packet_id,data)
+    
+    while packet:
+        sent = my_socket.sendto(packet, (dest_addr, 1))
+        packet = packet[sent:]
+
+    return my_socket,packet_id
+
+
+def recv(my_socket, packet_id):
+    ready = select.select([my_socket], [], [], 2) #таймаут 2с
+    rec_packet, addr = my_socket.recvfrom(1024)
+    icmp_header = rec_packet[20:28]    # Байты с 20 по 28 - заголовок ICMP
+    type, code, checksum, p_id, sequence = struct.unpack(
+            'bbHHh', icmp_header)
+
+    data = rec_packet[28:] # Наш hello будет лежать после заголовка ICMP
+    return data
+
+
+
+def main():
+    sock, id=send("8.8.8.8",b"hohoho")
+    print(recv(sock,id))
+
+
+if __name__ == '__main__':
+    main()

+ 106 - 0
python_net/packer/misc/net_params.py

@@ -0,0 +1,106 @@
+import ipaddress
+import socket
+import subprocess
+import typing
+import getmac
+
+
+__all__ = (
+    'LocalNetwork',
+    'is_valid_ipv4',
+    'enforce_mac',
+    'enforce_mac',
+    'parse_ip',
+)
+
+
+OUR_MAC = None
+OUR_IP = None
+OUR_INTERFACE = None
+
+
+class LocalNetwork:
+    # def get_mac(self, interface: typing.Optional[str]) -> str:
+    #     if interface is None:
+    #         interface = self.
+
+    def get_mac(self, interface: typing.Optional[str] = None) -> str:
+        if interface is None:
+            interface = self.get_default_interface()
+        global OUR_MAC
+        if OUR_MAC:
+            return OUR_MAC
+
+        OUR_MAC = getmac.get_mac_address(interface)
+        return typing.cast(str, OUR_MAC)
+
+
+    def get_ip(self) -> str:
+        global OUR_IP
+        if OUR_IP:
+            return OUR_IP
+        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
+            s.connect(("1.1.1.1", 80))
+            ip = typing.cast(str, s.getsockname()[0])
+            OUR_IP = ip
+            return OUR_IP
+
+    def get_default_interface(self) -> str:
+        global OUR_INTERFACE
+        if OUR_INTERFACE:
+            return OUR_INTERFACE
+        output = subprocess.check_output(['ip', 'route', 'list']).decode().split()
+
+        for ind, word in enumerate(output):
+            if word == 'dev':
+                OUR_INTERFACE = output[ind + 1]
+                return OUR_INTERFACE
+        raise RuntimeError('Could not find default interface')
+
+
+def is_valid_ipv4(ip: str) -> bool:
+    try:
+        ipaddress.IPv4Address(ip)
+        return True
+    except ipaddress.AddressValueError:
+        return False
+
+
+def enforce_mac(mac: str) -> bytes:
+    mac_bytes = []
+    for b in mac.split(':'):
+        mac_bytes.append(int(b, 16))
+    return bytes(mac_bytes)
+
+
+def enforce_ip(ip: str) -> bytes:
+    ip_bytes = []
+    for b in ip.split('.'):
+        ip_bytes.append(int(b))
+    return bytes(ip_bytes)
+
+
+def parse_mac(mac: bytes) -> str:
+    mac_parts = []
+    for b in mac:
+        hex = f'{b:x}'
+        if len(hex) == 1:
+            hex = '0' + hex
+        mac_parts.append(hex)
+    return ':'.join(mac_parts)
+
+
+def parse_ip(ip: bytes) -> str:
+    ip_parts = []
+    for b in ip:
+        ip_parts.append(str(b))
+    return '.'.join(ip_parts)
+
+
+
+if __name__ == '__main__':
+    foo = LocalNetwork()
+    print(foo.get_default_interface())
+    print(foo.get_mac())
+    print(foo.get_ip())
+    # print(is_valid_ipv4('1.1.1.1'))