123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- from modbus import Modbus, MBError, NoResponseError
- import colorama
- from colorama import Fore
- import time
- import os
- reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120,'uptime': 0x0800,
- 'in_norm': 0x0122,'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804,
- 'in_deb_start': 0x124}
- class IO_Module(Modbus):
- def __init__(self, tty: str, brate: int, address: int):
- super().__init__(tty, brate, address)
- self.update_segment_number = 0
- def iap_start(self):
- """Reboot device in IAP mode"""
- request = bytes((self.address, 0x41, 0x01))
- response = self.raw_communicate(request + self._crc(request))
- def write_fw_start(self, size: int):
- """Ask device to start update"""
- self.update_segment_number = 0
- request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
- response = self.raw_communicate(request + self._crc(request), 5)
- if len(response) == 0:
- raise NoResponseError('No response on WRITE_START command')
- if len(response) != 5:
- raise MBError('Incorrect response length')
- if response[:3] != bytes((self.address, 0x41, 0x01)):
- raise MBError('Incorrect response')
- def write_fw_part(self, data: bytes):
- """Write piece of FW data in IAP mode"""
- header = bytes((self.address, 0x41, 0x02))
- request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
- response = self.raw_communicate(request + self._crc(request), 5)
- # self.print_hex(response)
- if len(response) != 5:
- raise MBError('Incorrect response length')
- if (response[:3]) != header:
- raise MBError('Incorrect response')
- self.update_segment_number += 1
- def iap_finish(self):
- """Complete FW transmission and check response"""
- header = request = bytes((self.address, 0x41, 0x03))
- response = self.raw_communicate(request + self._crc(request), 5)
- if len(response) != 5:
- raise MBError('Incorrect response length')
- if response[:3] != header:
- raise MBError('Incorrect response')
- def update(self, path):
- self.MB_TIMEOUT = 3
- size = os.path.getsize('fw.bin')
- print('Switch to IAP mode')
- self.iap_start()
- time.sleep(4)
- print(f'Start writing {size} bytes of FW')
- self.write_fw_start(size)
- time.sleep(2)
- print(f'Open FW file "{path}"...')
- with open(path, 'rb') as f:
- done = progress_cur = progress_pre = 0
- while True:
- buf = f.read(128)
- if len(buf):
- self.write_fw_part(buf)
- progress_cur = done / size
- else:
- break
- print('End of transmission')
- self.iap_finish()
- # 0x0100 - текущее состояние входов
- def get_inputs_bit(self) -> str:
- data = self.read_holding_registers(reg_table['in_bits'], 1)
- return format(data[0], '08b')
- # 0x0101 - 0x0110 Счетчики импульсов
- def get_inputs_counters(self):
- data = []
- for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2):
- data.append(self.read_uint32_holding(i))
- return data
- # 0x0120 - режим работы входов
- def get_inputs_mode(self):
- data = self.read_holding_registers(reg_table['in_mode'], 1)
- return format(data[0], '08b')
- def set_inputs_mode(self, val):
- self.write_holding_register(reg_table['in_mode'], val)
- #
- def set_input_mode(self, input, val):
- ret = self.read_holding_registers(reg_table['in_mode'], 1)
- if val == 1:
- data = ret[0] | (0b1 << (input - 1))
- else:
- data = ret[0] & ~(0b1 << (input - 1))
- self.set_inputs_mode(data)
- # 0x0122 - нормальное состояние входов
- def get_inputs_norm_state(self):
- data = self.read_holding_registers(reg_table['in_norm'], 1)
- return format(data[0], '08b')
- def set_inputs_norm_state(self, val):
- self.write_holding_register(reg_table['in_norm'], val)
- # 0x0124 - время антидребезга (ms)
- def get_debounce_channel(self, input):
- data = self.read_holding_registers(reg_table['in_deb_start'] + input - 1, 1)
- return data[0]
- def set_devounce_channel(self, input, val):
- self.write_holding_register(reg_table['in_deb_start'] + input - 1, val)
- def get_uptime(self):
- return self.read_uint32_holding(reg_table['uptime'])
- def get_rtc(self):
- return self.read_uint32_holding(reg_table['rtc_unix'])
- def set_rtc(self, utc):
- self.write_uint32(reg_table['rtc_sinhro'], utc)
-
- def main():
- colorama.init(autoreset=True)
- dev = IO_Module('COM24', 115200, 1)
- dev.MB_DEBUG = False
- # dev.update('fw.bin')
- # Запрос системных параметров, установка времени
-
- # print('Device uptime:', dev.get_uptime())
- # unix_time = dev.get_rtc()
- # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
- # print('Set time:', int(time.time()))
- # dev.set_rtc(int(time.time()))
- # time.sleep(1)
- # unix_time = dev.get_rtc()
- # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
-
-
-
- for i in range(1, 9):
- dev.set_input_mode(i, 1)
- # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
- for i in range(1, 9):
- dev.set_devounce_channel(i, 50 + i)
-
- # Установить нормальное состояние входов
- # dev.set_inputs_norm_state(0b00110101)
- while True:
- print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
- # Значения входов (битовое поле)
- print('Inputs values [bit field] :', Fore.GREEN + dev.get_inputs_bit())
- # Режим работы входов (битовое поле)
- print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
- # Нормальное состояние входов (битовое поле)
- print('Inputs norm [bit field] :', Fore.GREEN + dev.get_inputs_norm_state())
- # Период антидребезга (ms)
- for i in range(1, 9):
- print(f'Debounce input {i} (ms) :', Fore.GREEN + str(dev.get_debounce_channel(i)))
- # Значение счетчиков
- data = dev.get_inputs_counters()
- print('Inputs counters :', Fore.GREEN + ' | '.join(str(el) for el in data))
-
- # break
- time.sleep(1)
- # for i in range(1, 9):
- # dev.set_input_mode(i, 1)
- # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
-
- # for i in range(8, 0, -1):
- # dev.set_input_mode(i, 0)
- # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
-
- if __name__ == '__main__':
- main()
|