from modbus import Modbus from mb_registers import reg_table, LOG_REGS, Log from colorama import Fore from random import randint from time import sleep import time import colorama import struct from datetime import datetime, timedelta, timezone from serial import Serial ARCHIVE_ENTRY = 0x06 LOG_ENTRY = 0x07 class Parser: utc_offset = 10800 def __init__(self) -> None: t = datetime.now(timezone.utc).astimezone() self.utc_offset = t.utcoffset() // timedelta(seconds=1) class LogParser(Parser): events = {1: 'Включение питания/перезагрузка ', 2: 'Перевод времени ', 3: 'Обновление ПО ', 4: 'Самодиагностика/системная ошибка', 5: 'Изменение конфигурации ', 6: 'Диагностика выходов ', 7: 'Срабатывание уставок ', 8: 'Переход в безопасный режим ', 9: 'Очистка журнала/архива '} @staticmethod def print_entry(entry: tuple, index: int): timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset) ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}" print(Fore.CYAN + ans) class ArchiveParser(Parser): @staticmethod def print_entry(entry: tuple, channel: int, index: int): timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset) ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}" print(Fore.CYAN + ans) class LogReader: COM_LOG_CLEAR = 0x0002 COM_ARCHIVE_CLEAR = 0x0003 def __init__(self, modbus: Modbus): self.modbus = modbus self.log_capacity = 0 self.log_entries_number = 0 self.archive_capacity = 0 self.archive_entries_number = 0 self.archive_period = [] colorama.init(autoreset=True) def get_archive(self): print("LogReader") def get_log_info(self): # data = self.modbus.read_holding_registers(Log.INFO.value, 19) data = self.modbus.read_holding_registers(Log.LOG_ENTRYS_MAX.value, 11) self.log_capacity = data[0] self.log_entries_number = data[1] self.archive_capacity = data[2] self.archive_entries_number = data[3:11] self.archive_period = self.modbus.read_holding_registers(Log.ARCH_PER_CH1.value, 8) print(Fore.CYAN + "Log and archive params:\n") print('Log capacity :', Fore.CYAN + str(self.log_capacity)) print('Log entries number :', Fore.CYAN + str(self.log_entries_number)) print('Archive capacity :', Fore.CYAN + str(self.archive_capacity)) print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number)) print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n') def set_archive_channel_period(self, channel, value): """Установить период архивирования в канале 1..12 в секундах""" self.modbus.write_holding_register(Log.ARCH_PER_CH1.value + channel - 1, value) def log_clear(self): """Очистить журнал событий""" self.modbus.write_holding_register(Log.CMD.value, self.COM_LOG_CLEAR) def archive_channel_clear(self, channel): """Очистить архив по каналам 1..12""" self.modbus.write_holding_register(Log.CMD.value, self.COM_ARCHIVE_CLEAR + channel - 1) class DigitalLogReader(LogReader): def __init__(self, modbus: Modbus): super().__init__(modbus) def get_archive(self): print("DigitalLogReader") def get_archive_entry(self, channel, index): data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1) return struct.unpack('