io_module.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from modbus import Modbus, MBError, NoResponseError
  2. import colorama
  3. from colorama import Fore
  4. import time
  5. import os
  6. reg_table = {'in_bits': 0x0100, 'uptime': 0x0800, 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804}
  7. class IO_Module(Modbus):
  8. def __init__(self, tty: str, brate: int, address: int):
  9. super().__init__(tty, brate, address)
  10. self.update_segment_number = 0
  11. def iap_start(self):
  12. """Reboot device in IAP mode"""
  13. request = bytes((self.address, 0x41, 0x01))
  14. response = self.raw_communicate(request + self._crc(request))
  15. def write_fw_start(self, size: int):
  16. """Ask device to start update"""
  17. self.update_segment_number = 0
  18. request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  19. response = self.raw_communicate(request + self._crc(request), 5)
  20. if len(response) == 0:
  21. raise NoResponseError('No response on WRITE_START command')
  22. if len(response) != 5:
  23. raise MBError('Incorrect response length')
  24. if response[:3] != bytes((self.address, 0x41, 0x01)):
  25. raise MBError('Incorrect response')
  26. def write_fw_part(self, data: bytes):
  27. """Write piece of FW data in IAP mode"""
  28. header = bytes((self.address, 0x41, 0x02))
  29. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  30. response = self.raw_communicate(request + self._crc(request), 5)
  31. # self.print_hex(response)
  32. if len(response) != 5:
  33. raise MBError('Incorrect response length')
  34. if (response[:3]) != header:
  35. raise MBError('Incorrect response')
  36. self.update_segment_number += 1
  37. def iap_finish(self):
  38. """Complete FW transmission and check response"""
  39. header = request = bytes((self.address, 0x41, 0x03))
  40. response = self.raw_communicate(request + self._crc(request), 5)
  41. if len(response) != 5:
  42. raise MBError('Incorrect response length')
  43. if response[:3] != header:
  44. raise MBError('Incorrect response')
  45. def update(self, path):
  46. self.MB_TIMEOUT = 3
  47. size = os.path.getsize('fw.bin')
  48. print('Switch to IAP mode')
  49. self.iap_start()
  50. time.sleep(4)
  51. print(f'Start writing {size} bytes of FW')
  52. self.write_fw_start(size)
  53. time.sleep(2)
  54. print(f'Open FW file "{path}"...')
  55. with open(path, 'rb') as f:
  56. done = progress_cur = progress_pre = 0
  57. while True:
  58. buf = f.read(128)
  59. if len(buf):
  60. self.write_fw_part(buf)
  61. progress_cur = done / size
  62. else:
  63. break
  64. print('End of transmission')
  65. self.iap_finish()
  66. def get_input_bit(self) -> str:
  67. data = self.read_holding_registers(reg_table['in_bits'], 1)
  68. return format(data[0], '08b')
  69. def get_uptime(self):
  70. return self.read_uint32_holding(reg_table['uptime'])
  71. def get_rtc(self):
  72. return self.read_uint32_holding(reg_table['rtc_unix'])
  73. def set_rtc(self, utc):
  74. self.write_uint32(reg_table['rtc_sinhro'], utc)
  75. def main():
  76. colorama.init(autoreset=True)
  77. dev = IO_Module('COM22', 115200, 1)
  78. dev.MB_DEBUG = True
  79. # dev.update('fw.bin')
  80. # Запрос системных параметров, установка времени
  81. # print('Device uptime:', dev.get_uptime())
  82. # unix_time = dev.get_rtc()
  83. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  84. # print('Set time:', int(time.time()))
  85. # dev.set_rtc(int(time.time()))
  86. # time.sleep(1)
  87. # unix_time = dev.get_rtc()
  88. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  89. while True:
  90. print(Fore.GREEN + 'Inputs values [bit field]:', Fore.YELLOW + dev.get_input_bit())
  91. time.sleep(1)
  92. if __name__ == '__main__':
  93. main()