| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 | from modbus import Modbus, MBError, NoResponseErrorimport timeimport osclass Updater:    def __init__(self, modbus: Modbus):        self.modbus = modbus        self.update_segment_number = 0    def iap_start(self):        """Reboot device in IAP mode"""        request = bytes((self.modbus.address, 0x41, 0x01))        response = self.modbus.raw_communicate(request + self.modbus._crc(request))    def write_fw_start(self, size: int, model: str):        """Ask device to start update"""        self.update_segment_number = 0        # request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')        request = bytes((self.modbus.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') + model.ljust(16, '\0').encode('ascii')         response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)        if len(response) == 0:            raise NoResponseError('No response on WRITE_START command')        if len(response) != 5:            raise MBError('Incorrect response length')        if response[:3] != bytes((self.modbus.address, 0x41, 0x01)):            raise MBError('Incorrect response')    def write_fw_part(self, data: bytes):        """Write piece of FW data in IAP mode"""        header = bytes((self.modbus.address, 0x41, 0x02))        request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))        response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)        # self.print_hex(response)        if len(response) != 5:            raise MBError('Incorrect response length')        if (response[:3]) != header:            raise MBError('Incorrect response')        self.update_segment_number += 1    def iap_finish(self):        """Complete FW transmission and check response"""        header = request = bytes((self.modbus.address, 0x41, 0x03))        response = self.modbus.raw_communicate(request + self.modbus._crc(request), 5)        if len(response) != 5:            raise MBError('Incorrect response length')        if response[:3] != header:            raise MBError('Incorrect response')    def update(self, path, model):        self.modbus.MB_TIMEOUT = 3        size = os.path.getsize('fw.bin')        print('Switch to IAP mode')        self.iap_start()        time.sleep(4)        print(f'Start writing {size} bytes of FW')        self.write_fw_start(size, model)        time.sleep(2)        print(f'Open FW file "{path}"...')        with open(path, 'rb') as f:            done = progress_cur = progress_pre = 0            while True:                buf = f.read(128)                if len(buf):                    self.write_fw_part(buf)                    progress_cur = done / size                else:                    break        print('End of transmission')        self.iap_finish()def main():    passif '__name__' == '__main__':    main()
 |