from modbus import Modbus, MBError, NoResponseError import time import os class Updater: def __init__(self, modbus: Modbus): self.modbus = modbus self.update_segment_number = 0 def iap_start(self): """Reboot device in IAP mode""" request = bytes((self.modbus.address, 0x41, 0x01)) response = self.modbus.raw_communicate(request + self.modbus._crc(request)) def write_fw_start(self, size: int, model: str): """Ask device to start update""" self.update_segment_number = 0 # request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') + model.ljust(16, '\0').encode('ascii') response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5) if len(response) == 0: raise NoResponseError('No response on WRITE_START command') if len(response) != 5: raise MBError('Incorrect response length') if response[:3] != bytes((self.modbus.address, 0x41, 0x01)): raise MBError('Incorrect response') def write_fw_part(self, data: bytes): """Write piece of FW data in IAP mode""" header = bytes((self.modbus.address, 0x41, 0x02)) request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data)) response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5) # self.print_hex(response) if len(response) != 5: raise MBError('Incorrect response length') if (response[:3]) != header: raise MBError('Incorrect response') self.update_segment_number += 1 def iap_finish(self): """Complete FW transmission and check response""" header = request = bytes((self.modbus.address, 0x41, 0x03)) response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5) if len(response) != 5: raise MBError('Incorrect response length') if response[:3] != header: raise MBError('Incorrect response') def update(self, path, model): self.modbus.MB_TIMEOUT = 3 size = os.path.getsize('fw.bin') print('Switch to IAP mode') self.iap_start() time.sleep(4) print(f'Start writing {size} bytes of FW') self.write_fw_start(size, model) time.sleep(2) print(f'Open FW file "{path}"...') with open(path, 'rb') as f: done = progress_cur = progress_pre = 0 while True: buf = f.read(128) if len(buf): self.write_fw_part(buf) progress_cur = done / size else: break print('End of transmission') self.iap_finish() def main(): pass if '__name__' == '__main__': main()