123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import subprocess
- import os
- import socket
- from datetime import datetime
- class AnnounceM3():
- """Container and parser for M2 UDP announce data"""
- # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};{uptime};{production_date};{test_state};{button_box};'
- def __init__(self, model, serial, mac, sw_ver, button_set, button_mod,
- stm32_id, uptime, prod_date, test_state, button_box):
- self.model = model
- self.serial = serial
- self.mac = mac
- self.sw_ver = sw_ver
- self.button_set = button_set
- self.button_mod = button_mod
- self.stm32_id = stm32_id
- self.uptime = uptime
- self.prod_date = prod_date
- self.test_state = test_state
- self.button_box = button_box
- @classmethod
- def parse(cls, raw):
- """Parse raw bytes from received datagram"""
- if not isinstance(raw, str):
- try:
- raw = raw.decode()
- except Exception as e:
- return
- sp = raw.split(';')
- return cls(sp[0], sp[1], sp[2], sp[3], sp[4], sp[5], sp[6], sp[7], sp[8], sp[9], sp[10])
- def print_members(self):
- print(f'Модель: {self.model}')
- print(f'Серийный номер: {self.serial}')
- print(f'MAC: {self.mac}')
- print(f'Версия сновной прошивки: {self.sw_ver}')
- print(f'Кнопка "УСТАНОВКА": {self.button_set}')
- print(f'Кнопка "РЕЖИМ": {self.button_mod}')
- print(f'Тампер: {self.button_box}')
- print(f'STM32_ID: {self.stm32_id}')
- print(f'Время работы (uptime): {self.uptime}')
- print(f'Дата производства: {self.prod_date}')
- print(f'Статус тестирвоания: {self.test_state}')
-
- class Metrolog_M3():
- """Metrolog-M3 board service client"""
- ANNOUNCE_TIMEOUT = 5
- ANNOUNCE_PORT = 49049
- def __init__(self, ip):
- self.ip = ip
- def ping(self):
- """Test connection with device using ping"""
- null = open(os.devnull)
- try:
- subprocess.check_call(['ping', '-n', '1', self.ip],
- timeout=10, stdout=null, stderr=null)
- print("Ping - OK")
- return True
- except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
- return False
- @staticmethod
- def announce_socket():
- """Create socket to receive announce messages"""
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.settimeout(.5)
- sock.bind(('', Metrolog_M3.ANNOUNCE_PORT))
- return sock
- def wait_announce(self, timeout=None, get_ip=False):
- """Wait next UDP announce from device"""
- if timeout is None:
- self.timeout = self.ANNOUNCE_TIMEOUT
- sock = self.announce_socket()
- start = datetime.now()
- while True:
- try:
- raw, address = sock.recvfrom(4096)
- except socket.timeout:
- pass
- else:
- announce = AnnounceM3.parse(raw)
- if announce is not None:
- if address[0] == self.ip:
- print("Контроллер найден")
- return announce
- finally:
- if (datetime.now() - start).seconds > self.timeout:
- print("Превышено время ожидания запроса")
- return None
-
- # b'\xd0\x9c\xd0\xb5\xd1\x82\xd1\x80\xd0\xbe\xd0\xbb\xd0\xbe\xd0\xb3 M3.4.3;7030000;B6-C1-EF-9C-FB-00;1.016t;false;false;5A0037001451313136393333;1221;00.00.00;;false;'
- metrolog = Metrolog_M3("192.168.25.6")
- # print(metrolog.wait_announce())
- ann = metrolog.wait_announce()
- ann.print_members()
- # announce = AnnounceM3()
- # metrolog.ping()
- # print(datetime.now())
|