from modbus import Modbus from mb_registers import reg_table, LOG_REGS from colorama import Fore from random import randint from time import sleep import time import colorama import struct from datetime import datetime, timedelta, timezone ARCHIVE_ENTRY = 0x06 LOG_ENTRY = 0x07 class Parser: utc_offset = 10800 def __init__(self) -> None: t = datetime.now(timezone.utc).astimezone() self.utc_offset = t.utcoffset() // timedelta(seconds=1) class LogParser(Parser): events = {1: 'Включение питания/перезагрузка ', 2: 'Перевод времени ', 3: 'Обновление ПО ', 4: 'Самодиагностика/системная ошибка', 5: 'Изменение конфигурации ', 6: 'Диагностика выходов ', 7: 'Срабатывание уставок ', 8: 'Переход в безопасный режим ', 9: 'Очистка журнала/архива '} @staticmethod def print_entry(entry: tuple, index: int): timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset) ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}" print(Fore.CYAN + ans) class ArchiveParser(Parser): @staticmethod def print_entry(entry: tuple, channel: int, index: int): timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset) ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}" print(Fore.CYAN + ans) class LogReader: COM_LOG_CLEAR = 0x0002 COM_ARCHIVE_CLEAR = 0x0003 def __init__(self, modbus: Modbus): self.modbus = modbus self.log_capacity = 0 self.log_entries_number = 0 self.archive_capacity = 0 self.archive_entries_number = 0 self.archive_period = [] colorama.init(autoreset=True) def get_archive(self): print("LogReader") def get_log_info(self): data = self.modbus.read_holding_registers(reg_table['log_info'], 19) self.log_capacity = data[0] self.log_entries_number = data[1] self.archive_capacity = data[2] self.archive_entries_number = data[3:11] self.archive_period = data[12:21] print(Fore.CYAN + "Log and archive params:\n") print('Log capacity :', Fore.CYAN + str(self.log_capacity)) print('Log entries number :', Fore.CYAN + str(self.log_entries_number)) print('Archive capacity :', Fore.CYAN + str(self.archive_capacity)) print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number)) print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n') def set_archive_period(self, value, channel): self.modbus.write_holding_register(LOG_REGS['arch_per'] + channel, value) def log_clear(self): self.modbus.write_holding_register(reg_table['param_manager'], self.COM_LOG_CLEAR) def archive_clear(self): self.modbus.write_holding_register(reg_table['param_manager'], self.COM_ARCHIVE_CLEAR) class DigitalLogReader(LogReader): def __init__(self, modbus: Modbus): super().__init__(modbus) def get_archive(self): print("DigitalLogReader") def get_archive_entry(self, channel, index): data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1) return struct.unpack('