metrolog_m3.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import subprocess
  2. import os
  3. import socket
  4. from datetime import datetime
  5. class AnnounceM3():
  6. """Container and parser for M2 UDP announce data"""
  7. # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};{uptime};{production_date};{test_state};{button_box};'
  8. def __init__(self, model, serial, mac, sw_ver, button_set, button_mod,
  9. stm32_id, uptime, prod_date, test_state, button_box):
  10. self.model = model
  11. self.serial = serial
  12. self.mac = mac
  13. self.sw_ver = sw_ver
  14. self.button_set = button_set
  15. self.button_mod = button_mod
  16. self.stm32_id = stm32_id
  17. self.uptime = uptime
  18. self.prod_date = prod_date
  19. self.test_state = test_state
  20. self.button_box = button_box
  21. @classmethod
  22. def parse(cls, raw):
  23. """Parse raw bytes from received datagram"""
  24. if not isinstance(raw, str):
  25. try:
  26. raw = raw.decode()
  27. except Exception as e:
  28. return
  29. sp = raw.split(';')
  30. return cls(sp[0], sp[1], sp[2], sp[3], sp[4], sp[5], sp[6], sp[7], sp[8], sp[9], sp[10])
  31. def print_members(self):
  32. print(f'Модель: {self.model}')
  33. print(f'Серийный номер: {self.serial}')
  34. print(f'MAC: {self.mac}')
  35. print(f'Версия сновной прошивки: {self.sw_ver}')
  36. print(f'Кнопка "УСТАНОВКА": {self.button_set}')
  37. print(f'Кнопка "РЕЖИМ": {self.button_mod}')
  38. print(f'Тампер: {self.button_box}')
  39. print(f'STM32_ID: {self.stm32_id}')
  40. print(f'Время работы (uptime): {self.uptime}')
  41. print(f'Дата производства: {self.prod_date}')
  42. print(f'Статус тестирвоания: {self.test_state}')
  43. class Metrolog_M3():
  44. """Metrolog-M3 board service client"""
  45. ANNOUNCE_TIMEOUT = 5
  46. ANNOUNCE_PORT = 49049
  47. def __init__(self, ip):
  48. self.ip = ip
  49. def ping(self):
  50. """Test connection with device using ping"""
  51. null = open(os.devnull)
  52. try:
  53. subprocess.check_call(['ping', '-n', '1', self.ip],
  54. timeout=10, stdout=null, stderr=null)
  55. print("Ping - OK")
  56. return True
  57. except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
  58. return False
  59. @staticmethod
  60. def announce_socket():
  61. """Create socket to receive announce messages"""
  62. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  63. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  64. sock.settimeout(.5)
  65. sock.bind(('', Metrolog_M3.ANNOUNCE_PORT))
  66. return sock
  67. def wait_announce(self, timeout=None, get_ip=False):
  68. """Wait next UDP announce from device"""
  69. if timeout is None:
  70. self.timeout = self.ANNOUNCE_TIMEOUT
  71. sock = self.announce_socket()
  72. start = datetime.now()
  73. while True:
  74. try:
  75. raw, address = sock.recvfrom(4096)
  76. except socket.timeout:
  77. pass
  78. else:
  79. announce = AnnounceM3.parse(raw)
  80. if announce is not None:
  81. if address[0] == self.ip:
  82. print("Контроллер найден")
  83. return announce
  84. finally:
  85. if (datetime.now() - start).seconds > self.timeout:
  86. print("Превышено время ожидания запроса")
  87. return None
  88. # b'\xd0\x9c\xd0\xb5\xd1\x82\xd1\x80\xd0\xbe\xd0\xbb\xd0\xbe\xd0\xb3 M3.4.3;7030000;B6-C1-EF-9C-FB-00;1.016t;false;false;5A0037001451313136393333;1221;00.00.00;;false;'
  89. metrolog = Metrolog_M3("192.168.25.6")
  90. # print(metrolog.wait_announce())
  91. ann = metrolog.wait_announce()
  92. ann.print_members()
  93. # announce = AnnounceM3()
  94. # metrolog.ping()
  95. # print(datetime.now())