| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 | 
							- from modbus import Modbus, MBError, NoResponseError
 
- import time
 
- import os
 
- class Updater:
 
-     def __init__(self, modbus: Modbus):
 
-         self.modbus = modbus
 
-         self.update_segment_number = 0
 
-     def iap_start(self):
 
-         """Reboot device in IAP mode"""
 
-         request = bytes((self.modbus.address, 0x41, 0x01))
 
-         response = self.modbus.raw_communicate(request + self.modbus._crc(request))
 
-     def write_fw_start(self, size: int, model: str):
 
-         """Ask device to start update"""
 
-         self.update_segment_number = 0
 
-         # request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
 
-         request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') + model.ljust(16, '\0').encode('ascii') 
 
-         response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
 
-         if len(response) == 0:
 
-             raise NoResponseError('No response on WRITE_START command')
 
-         if len(response) != 5:
 
-             raise MBError('Incorrect response length')
 
-         if response[:3] != bytes((self.modbus.address, 0x41, 0x01)):
 
-             raise MBError('Incorrect response')
 
-     def write_fw_part(self, data: bytes):
 
-         """Write piece of FW data in IAP mode"""
 
-         header = bytes((self.modbus.address, 0x41, 0x02))
 
-         request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
 
-         response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
 
-         # self.print_hex(response)
 
-         if len(response) != 5:
 
-             raise MBError('Incorrect response length')
 
-         if (response[:3]) != header:
 
-             raise MBError('Incorrect response')
 
-         self.update_segment_number += 1
 
-     def iap_finish(self):
 
-         """Complete FW transmission and check response"""
 
-         header = request = bytes((self.modbus.address, 0x41, 0x03))
 
-         response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)
 
-         if len(response) != 5:
 
-             raise MBError('Incorrect response length')
 
-         if response[:3] != header:
 
-             raise MBError('Incorrect response')
 
-     def update(self, path, model):
 
-         self.modbus.MB_TIMEOUT = 3
 
-         size = os.path.getsize('fw.bin')
 
-         print('Switch to IAP mode')
 
-         self.iap_start()
 
-         time.sleep(4)
 
-         print(f'Start writing {size} bytes of FW')
 
-         self.write_fw_start(size, model)
 
-         time.sleep(2)
 
-         print(f'Open FW file "{path}"...')
 
-         with open(path, 'rb') as f:
 
-             done = progress_cur = progress_pre = 0
 
-             while True:
 
-                 buf = f.read(128)
 
-                 if len(buf):
 
-                     self.write_fw_part(buf)
 
-                     progress_cur = done / size
 
-                 else:
 
-                     break
 
-         print('End of transmission')
 
-         self.iap_finish()
 
- def main():
 
-     pass
 
- if '__name__' == '__main__':
 
-     main()
 
 
  |