|
@@ -0,0 +1,112 @@
|
|
|
+import subprocess
|
|
|
+import os
|
|
|
+import socket
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+class AnnounceM3():
|
|
|
+ """Container and parser for M2 UDP announce data"""
|
|
|
+ # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};{uptime};{production_date};{test_state};{button_box};'
|
|
|
+
|
|
|
+ def __init__(self, model, serial, mac, sw_ver, button_set, button_mod,
|
|
|
+ stm32_id, uptime, prod_date, test_state, button_box):
|
|
|
+ self.model = model
|
|
|
+ self.serial = serial
|
|
|
+ self.mac = mac
|
|
|
+ self.sw_ver = sw_ver
|
|
|
+ self.button_set = button_set
|
|
|
+ self.button_mod = button_mod
|
|
|
+ self.stm32_id = stm32_id
|
|
|
+ self.uptime = uptime
|
|
|
+ self.prod_date = prod_date
|
|
|
+ self.test_state = test_state
|
|
|
+ self.button_box = button_box
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def parse(cls, raw):
|
|
|
+ """Parse raw bytes from received datagram"""
|
|
|
+ if not isinstance(raw, str):
|
|
|
+ try:
|
|
|
+ raw = raw.decode()
|
|
|
+ except Exception as e:
|
|
|
+ return
|
|
|
+ sp = raw.split(';')
|
|
|
+ return cls(sp[0], sp[1], sp[2], sp[3], sp[4], sp[5], sp[6], sp[7], sp[8], sp[9], sp[10])
|
|
|
+
|
|
|
+ def print_members(self):
|
|
|
+ print(f'Модель: {self.model}')
|
|
|
+ print(f'Серийный номер: {self.serial}')
|
|
|
+ print(f'MAC: {self.mac}')
|
|
|
+ print(f'Версия сновной прошивки: {self.sw_ver}')
|
|
|
+ print(f'Кнопка "УСТАНОВКА": {self.button_set}')
|
|
|
+ print(f'Кнопка "РЕЖИМ": {self.button_mod}')
|
|
|
+ print(f'Тампер: {self.button_box}')
|
|
|
+ print(f'STM32_ID: {self.stm32_id}')
|
|
|
+ print(f'Время работы (uptime): {self.uptime}')
|
|
|
+ print(f'Дата производства: {self.prod_date}')
|
|
|
+ print(f'Статус тестирвоания: {self.test_state}')
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class Metrolog_M3():
|
|
|
+ """Metrolog-M3 board service client"""
|
|
|
+ ANNOUNCE_TIMEOUT = 5
|
|
|
+ ANNOUNCE_PORT = 49049
|
|
|
+
|
|
|
+ def __init__(self, ip):
|
|
|
+ self.ip = ip
|
|
|
+
|
|
|
+ def ping(self):
|
|
|
+ """Test connection with device using ping"""
|
|
|
+ null = open(os.devnull)
|
|
|
+ try:
|
|
|
+ subprocess.check_call(['ping', '-n', '1', self.ip],
|
|
|
+ timeout=10, stdout=null, stderr=null)
|
|
|
+ print("Ping - OK")
|
|
|
+ return True
|
|
|
+ except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
|
|
+ return False
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def announce_socket():
|
|
|
+ """Create socket to receive announce messages"""
|
|
|
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
+ sock.settimeout(.5)
|
|
|
+ sock.bind(('', Metrolog_M3.ANNOUNCE_PORT))
|
|
|
+ return sock
|
|
|
+
|
|
|
+ def wait_announce(self, timeout=None, get_ip=False):
|
|
|
+ """Wait next UDP announce from device"""
|
|
|
+ if timeout is None:
|
|
|
+ self.timeout = self.ANNOUNCE_TIMEOUT
|
|
|
+ sock = self.announce_socket()
|
|
|
+ start = datetime.now()
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ raw, address = sock.recvfrom(4096)
|
|
|
+ except socket.timeout:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ announce = AnnounceM3.parse(raw)
|
|
|
+ if announce is not None:
|
|
|
+ if address[0] == self.ip:
|
|
|
+ print("Контроллер найден")
|
|
|
+ return announce
|
|
|
+ finally:
|
|
|
+ if (datetime.now() - start).seconds > self.timeout:
|
|
|
+ print("Превышено время ожидания запроса")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# b'\xd0\x9c\xd0\xb5\xd1\x82\xd1\x80\xd0\xbe\xd0\xbb\xd0\xbe\xd0\xb3 M3.4.3;7030000;B6-C1-EF-9C-FB-00;1.016t;false;false;5A0037001451313136393333;1221;00.00.00;;false;'
|
|
|
+
|
|
|
+metrolog = Metrolog_M3("192.168.25.6")
|
|
|
+# print(metrolog.wait_announce())
|
|
|
+ann = metrolog.wait_announce()
|
|
|
+ann.print_members()
|
|
|
+
|
|
|
+
|
|
|
+# announce = AnnounceM3()
|
|
|
+# metrolog.ping()
|
|
|
+# print(datetime.now())
|