io_module.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from modbus import Modbus, MBError, NoResponseError
  2. import colorama
  3. from colorama import Fore
  4. import time
  5. import os
  6. reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120,'uptime': 0x0800,
  7. 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804}
  8. class IO_Module(Modbus):
  9. def __init__(self, tty: str, brate: int, address: int):
  10. super().__init__(tty, brate, address)
  11. self.update_segment_number = 0
  12. def iap_start(self):
  13. """Reboot device in IAP mode"""
  14. request = bytes((self.address, 0x41, 0x01))
  15. response = self.raw_communicate(request + self._crc(request))
  16. def write_fw_start(self, size: int):
  17. """Ask device to start update"""
  18. self.update_segment_number = 0
  19. request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  20. response = self.raw_communicate(request + self._crc(request), 5)
  21. if len(response) == 0:
  22. raise NoResponseError('No response on WRITE_START command')
  23. if len(response) != 5:
  24. raise MBError('Incorrect response length')
  25. if response[:3] != bytes((self.address, 0x41, 0x01)):
  26. raise MBError('Incorrect response')
  27. def write_fw_part(self, data: bytes):
  28. """Write piece of FW data in IAP mode"""
  29. header = bytes((self.address, 0x41, 0x02))
  30. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  31. response = self.raw_communicate(request + self._crc(request), 5)
  32. # self.print_hex(response)
  33. if len(response) != 5:
  34. raise MBError('Incorrect response length')
  35. if (response[:3]) != header:
  36. raise MBError('Incorrect response')
  37. self.update_segment_number += 1
  38. def iap_finish(self):
  39. """Complete FW transmission and check response"""
  40. header = request = bytes((self.address, 0x41, 0x03))
  41. response = self.raw_communicate(request + self._crc(request), 5)
  42. if len(response) != 5:
  43. raise MBError('Incorrect response length')
  44. if response[:3] != header:
  45. raise MBError('Incorrect response')
  46. def update(self, path):
  47. self.MB_TIMEOUT = 3
  48. size = os.path.getsize('fw.bin')
  49. print('Switch to IAP mode')
  50. self.iap_start()
  51. time.sleep(4)
  52. print(f'Start writing {size} bytes of FW')
  53. self.write_fw_start(size)
  54. time.sleep(2)
  55. print(f'Open FW file "{path}"...')
  56. with open(path, 'rb') as f:
  57. done = progress_cur = progress_pre = 0
  58. while True:
  59. buf = f.read(128)
  60. if len(buf):
  61. self.write_fw_part(buf)
  62. progress_cur = done / size
  63. else:
  64. break
  65. print('End of transmission')
  66. self.iap_finish()
  67. # 0x0100 - текущее состояние входов
  68. def get_inputs_bit(self) -> str:
  69. data = self.read_holding_registers(reg_table['in_bits'], 1)
  70. return format(data[0], '08b')
  71. # 0x0101 - 0x0110 Счетчики импульсов
  72. def get_inputs_counters(self):
  73. data = []
  74. for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2):
  75. data.append(self.read_uint32_holding(i))
  76. return data
  77. # 0x0120 - режим работы входов
  78. def get_inputs_mode(self):
  79. data = self.read_holding_registers(reg_table['in_mode'], 1)
  80. return format(data[0], '08b')
  81. def set_inputs_mode(self, val):
  82. self.write_holding_register(reg_table['in_mode'], val)
  83. def get_uptime(self):
  84. return self.read_uint32_holding(reg_table['uptime'])
  85. def get_rtc(self):
  86. return self.read_uint32_holding(reg_table['rtc_unix'])
  87. def set_rtc(self, utc):
  88. self.write_uint32(reg_table['rtc_sinhro'], utc)
  89. def main():
  90. colorama.init(autoreset=True)
  91. dev = IO_Module('COM24', 115200, 1)
  92. dev.MB_DEBUG = False
  93. # dev.update('fw.bin')
  94. # Запрос системных параметров, установка времени
  95. # print('Device uptime:', dev.get_uptime())
  96. # unix_time = dev.get_rtc()
  97. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  98. # print('Set time:', int(time.time()))
  99. # dev.set_rtc(int(time.time()))
  100. # time.sleep(1)
  101. # unix_time = dev.get_rtc()
  102. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  103. # while True:
  104. for i in range(10):
  105. print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
  106. print('Inputs values [bit field :', Fore.GREEN + dev.get_inputs_bit())
  107. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  108. data = dev.get_inputs_counters()
  109. print('Inputs counters :', Fore.GREEN + ' | '.join(str(el) for el in data))
  110. dev.set_inputs_mode(i)
  111. time.sleep(1)
  112. if __name__ == '__main__':
  113. main()