from modbus import Modbus, MBError, NoResponseError import colorama from colorama import Fore import time import os import struct reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120, 'in_norm': 0x0122, 'in_deb_start': 0x124, 'out_cur': 0x0200, 'out_mode': 0x0202, 'out_mode_save': 0x0203, 'pwm_duty': 0x0210, 'pwm_duty_save': 0x0220, 'pwm_per': 0x0230, 'pwm_per_save': 0x0240, 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804, 'uptime': 0x0800, 'log_info': 0x0900, 'log_ent': 0x0901, 'arch_cap': 0x0902, 'arch_ent': 0x0903} class IO_Module(Modbus): def __init__(self, tty: str, brate: int, address: int): super().__init__(tty, brate, address) self.update_segment_number = 0 def iap_start(self): """Reboot device in IAP mode""" request = bytes((self.address, 0x41, 0x01)) response = self.raw_communicate(request + self._crc(request)) def write_fw_start(self, size: int): """Ask device to start update""" self.update_segment_number = 0 request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big') response = self.raw_communicate(request + self._crc(request), 5) if len(response) == 0: raise NoResponseError('No response on WRITE_START command') if len(response) != 5: raise MBError('Incorrect response length') if response[:3] != bytes((self.address, 0x41, 0x01)): raise MBError('Incorrect response') def write_fw_part(self, data: bytes): """Write piece of FW data in IAP mode""" header = bytes((self.address, 0x41, 0x02)) request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data)) response = self.raw_communicate(request + self._crc(request), 5) # self.print_hex(response) if len(response) != 5: raise MBError('Incorrect response length') if (response[:3]) != header: raise MBError('Incorrect response') self.update_segment_number += 1 def iap_finish(self): """Complete FW transmission and check response""" header = request = bytes((self.address, 0x41, 0x03)) response = self.raw_communicate(request + self._crc(request), 5) if len(response) != 5: raise MBError('Incorrect response length') if response[:3] != header: raise MBError('Incorrect response') def update(self, path): self.MB_TIMEOUT = 3 size = os.path.getsize('fw.bin') print('Switch to IAP mode') self.iap_start() time.sleep(4) print(f'Start writing {size} bytes of FW') self.write_fw_start(size) time.sleep(2) print(f'Open FW file "{path}"...') with open(path, 'rb') as f: done = progress_cur = progress_pre = 0 while True: buf = f.read(128) if len(buf): self.write_fw_part(buf) progress_cur = done / size else: break print('End of transmission') self.iap_finish() # 0x0100 - текущее состояние входов def get_inputs_bit(self) -> str: data = self.read_holding_registers(reg_table['in_bits'], 1) return format(data[0], '08b') # 0x0101 - 0x0110 Счетчики импульсов def get_inputs_counters(self): data = [] for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2): data.append(self.read_uint32_holding(i)) return data # 0x0120 - режим работы входов def get_inputs_mode(self): data = self.read_holding_registers(reg_table['in_mode'], 1) return format(data[0], '08b') def set_inputs_mode(self, val): self.write_holding_register(reg_table['in_mode'], val) # def set_input_mode(self, input, val): ret = self.read_holding_registers(reg_table['in_mode'], 1) if val == 1: data = ret[0] | (0b1 << (input - 1)) else: data = ret[0] & ~(0b1 << (input - 1)) self.set_inputs_mode(data) # 0x0122 - нормальное состояние входов def get_inputs_norm_state(self): data = self.read_holding_registers(reg_table['in_norm'], 1) return format(data[0], '08b') def set_inputs_norm_state(self, val): self.write_holding_register(reg_table['in_norm'], val) # 0x0124 - время антидребезга (ms) def get_debounce_channel(self, input): data = self.read_holding_registers(reg_table['in_deb_start'] + input - 1, 1) return data[0] def get_debounce_channels(self): return self.read_holding_registers(reg_table['in_deb_start'], 8) def set_debounce_channel(self, input, val): self.write_holding_register(reg_table['in_deb_start'] + input - 1, val) # 0x0200 - текущее состояние выходов в обычно режиме def get_outputs(self): data = self.read_holding_registers(reg_table['out_cur'], 1) return format(data[0], '08b') # 0x0200 - текущее состояние выходов в обычно режиме def set_outputs(self, val): self.write_holding_register(reg_table['out_cur'], val) def set_output(self, output, val): ret = self.read_holding_registers(reg_table['out_cur'], 1) if val == 1: data = ret[0] | (0b1 << (output - 1)) else: data = ret[0] & ~(0b1 << (output - 1)) self.set_outputs(data) # 0x0202 - режим работы выходов; 0 - обычный, 1 - ШИМ def get_outputs_mode(self): data = self.read_holding_registers(reg_table['out_mode'], 1) return format(data[0], '08b') def set_outputs_mode(self, val): self.write_holding_register(reg_table['out_mode'], val) def set_output_mode(self, output, val): ret = self.read_holding_registers(reg_table['out_mode'], 1) if val == 1: data = ret[0] | (0b1 << (output - 1)) else: data = ret[0] & ~(0b1 << (output - 1)) self.set_outputs_mode(data) # 0x0203 - состояние выходов (режим обычного выхода) в безопасном режиме работы def get_outputs_mode_save(self): data = self.read_holding_registers(reg_table['out_mode_save'], 1) return format(data[0], '08b') def set_outputs_mode_save(self, val): self.write_holding_register(reg_table['out_mode_save'], val) def set_output_mode_save(self, output, val): ret = self.read_holding_registers(reg_table['out_mode_save'], 1) if val == 1: data = ret[0] | (0b1 << (output - 1)) else: data = ret[0] & ~(0b1 << (output - 1)) self.set_outputs_mode_save(data) # 0x0210 - заполнение PWM (%) def get_pwm_duty(self, output): data = self.read_holding_registers(reg_table['pwm_duty'] + output - 1, 1) return data[0] def get_pwm_duty_all(self): return self.read_holding_registers(reg_table['pwm_duty'], 8) def set_pwm_duty(self, output, val): self.write_holding_register(reg_table['pwm_duty'] + output - 1, val) # 0x0220 - заполнение PWM (%) def get_pwm_duty_all_save(self): return self.read_holding_registers(reg_table['pwm_duty_save'], 8) def set_pwm_duty_save(self, output, val): self.write_holding_register(reg_table['pwm_duty_save'] + output - 1, val) # 0x0230 - период PWM (0.1 сек) def get_pwm_period_all(self): return self.read_holding_registers(reg_table['pwm_per'], 8) def set_pwm_period(self, output, val): self.write_holding_register(reg_table['pwm_per'] + output - 1, val) # 0x0240 - период PWM в безопасном режиме (0.1 сек) def get_pwm_period_all_save(self): return self.read_holding_registers(reg_table['pwm_per_save'], 8) def set_pwm_period_save(self, output, val): self.write_holding_register(reg_table['pwm_per_save'] + output - 1, val) def get_uptime(self): return self.read_uint32_holding(reg_table['uptime']) def get_rtc(self): return self.read_uint32_holding(reg_table['rtc_unix']) def set_rtc(self, utc): self.write_uint32(reg_table['rtc_sinhro'], utc) ''' def get_archive_entryes(self, ent_num, ent_index): data = self.read_file_record(ARCHIVE_ENTRY, 1, 1) #print(struct.unpack('