import subprocess import os import socket from datetime import datetime class AnnounceM3(): """Container and parser for M2 UDP announce data""" # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};{uptime};{production_date};{test_state};{button_box};' def __init__(self, model, serial, mac, sw_ver, button_set, button_mod, stm32_id, uptime, prod_date, test_state, button_box): self.model = model self.serial = serial self.mac = mac self.sw_ver = sw_ver self.button_set = button_set self.button_mod = button_mod self.stm32_id = stm32_id self.uptime = uptime self.prod_date = prod_date self.test_state = test_state self.button_box = button_box @classmethod def parse(cls, raw): """Parse raw bytes from received datagram""" if not isinstance(raw, str): try: raw = raw.decode() except Exception as e: return sp = raw.split(';') return cls(sp[0], sp[1], sp[2], sp[3], sp[4], sp[5], sp[6], sp[7], sp[8], sp[9], sp[10]) def print_members(self): print(f'Модель: {self.model}') print(f'Серийный номер: {self.serial}') print(f'MAC: {self.mac}') print(f'Версия сновной прошивки: {self.sw_ver}') print(f'Кнопка "УСТАНОВКА": {self.button_set}') print(f'Кнопка "РЕЖИМ": {self.button_mod}') print(f'Тампер: {self.button_box}') print(f'STM32_ID: {self.stm32_id}') print(f'Время работы (uptime): {self.uptime}') print(f'Дата производства: {self.prod_date}') print(f'Статус тестирвоания: {self.test_state}') class Metrolog_M3(): """Metrolog-M3 board service client""" ANNOUNCE_TIMEOUT = 5 ANNOUNCE_PORT = 49049 def __init__(self, ip): self.ip = ip def ping(self): """Test connection with device using ping""" null = open(os.devnull) try: subprocess.check_call(['ping', '-n', '1', self.ip], timeout=10, stdout=null, stderr=null) print("Ping - OK") return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return False @staticmethod def announce_socket(): """Create socket to receive announce messages""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(.5) sock.bind(('', Metrolog_M3.ANNOUNCE_PORT)) return sock def wait_announce(self, timeout=None, get_ip=False): """Wait next UDP announce from device""" if timeout is None: self.timeout = self.ANNOUNCE_TIMEOUT sock = self.announce_socket() start = datetime.now() while True: try: raw, address = sock.recvfrom(4096) except socket.timeout: pass else: announce = AnnounceM3.parse(raw) if announce is not None: if address[0] == self.ip: print("Контроллер найден") return announce finally: if (datetime.now() - start).seconds > self.timeout: print("Превышено время ожидания запроса") return None # b'\xd0\x9c\xd0\xb5\xd1\x82\xd1\x80\xd0\xbe\xd0\xbb\xd0\xbe\xd0\xb3 M3.4.3;7030000;B6-C1-EF-9C-FB-00;1.016t;false;false;5A0037001451313136393333;1221;00.00.00;;false;' metrolog = Metrolog_M3("192.168.25.6") # print(metrolog.wait_announce()) ann = metrolog.wait_announce() ann.print_members() # announce = AnnounceM3() # metrolog.ping() # print(datetime.now())