updater.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from modbus import Modbus, MBError, NoResponseError
  2. import time
  3. import os
  4. class Updater:
  5. def __init__(self, modbus: Modbus):
  6. self.modbus = modbus
  7. self.update_segment_number = 0
  8. def iap_start(self):
  9. """Reboot device in IAP mode"""
  10. request = bytes((self.modbus.address, 0x41, 0x01))
  11. response = self.modbus.raw_communicate(request + self.modbus._crc(request))
  12. def write_fw_start(self, size: int, model: str):
  13. """Ask device to start update"""
  14. self.update_segment_number = 0
  15. # request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  16. request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') + model.ljust(16, '\0').encode('ascii')
  17. response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
  18. if len(response) == 0:
  19. raise NoResponseError('No response on WRITE_START command')
  20. if len(response) != 5:
  21. raise MBError('Incorrect response length')
  22. if response[:3] != bytes((self.modbus.address, 0x41, 0x01)):
  23. raise MBError('Incorrect response')
  24. def write_fw_part(self, data: bytes):
  25. """Write piece of FW data in IAP mode"""
  26. header = bytes((self.modbus.address, 0x41, 0x02))
  27. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  28. response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
  29. # self.print_hex(response)
  30. if len(response) != 5:
  31. raise MBError('Incorrect response length')
  32. if (response[:3]) != header:
  33. raise MBError('Incorrect response')
  34. self.update_segment_number += 1
  35. def iap_finish(self):
  36. """Complete FW transmission and check response"""
  37. header = request = bytes((self.modbus.address, 0x41, 0x03))
  38. response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
  39. if len(response) != 5:
  40. raise MBError('Incorrect response length')
  41. if response[:3] != header:
  42. raise MBError('Incorrect response')
  43. def update(self, path, model):
  44. self.modbus.MB_TIMEOUT = 3
  45. size = os.path.getsize('fw.bin')
  46. print('Switch to IAP mode')
  47. self.iap_start()
  48. time.sleep(4)
  49. print(f'Start writing {size} bytes of FW')
  50. self.write_fw_start(size, model)
  51. time.sleep(2)
  52. print(f'Open FW file "{path}"...')
  53. with open(path, 'rb') as f:
  54. done = progress_cur = progress_pre = 0
  55. while True:
  56. buf = f.read(128)
  57. if len(buf):
  58. self.write_fw_part(buf)
  59. progress_cur = done / size
  60. else:
  61. break
  62. print('End of transmission')
  63. self.iap_finish()
  64. def main():
  65. pass
  66. if '__name__' == '__main__':
  67. main()