| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 | from modbus import Modbusfrom mb_registers import reg_table, LOG_REGS, Logfrom colorama import Forefrom random import randintfrom time import sleepimport timeimport coloramaimport structfrom datetime import datetime, timedelta, timezonefrom serial import SerialARCHIVE_ENTRY = 0x06LOG_ENTRY = 0x07class Parser:    utc_offset = 10800    def __init__(self) -> None:        t = datetime.now(timezone.utc).astimezone()        self.utc_offset = t.utcoffset() // timedelta(seconds=1)class LogParser(Parser):    events = {1: 'Включение питания/перезагрузка  ',              2: 'Перевод времени                 ',              3: 'Обновление ПО                   ',              4: 'Самодиагностика/системная ошибка',              5: 'Изменение конфигурации          ',              6: 'Диагностика выходов             ',              7: 'Срабатывание уставок            ',              8: 'Переход в безопасный режим      ',              9: 'Очистка журнала/архива          '}    @staticmethod    def print_entry(entry: tuple, index: int):        timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)        ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}"        print(Fore.CYAN + ans)class ArchiveParser(Parser):    @staticmethod    def print_entry(entry: tuple, channel: int, index: int):        timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)        ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}"        print(Fore.CYAN + ans)class LogReader:    COM_LOG_CLEAR = 0x0002    COM_ARCHIVE_CLEAR = 0x0003    def __init__(self, modbus: Modbus):        self.modbus = modbus        self.log_capacity = 0        self.log_entries_number = 0        self.archive_capacity = 0        self.archive_entries_number = 0            self.archive_period = []        colorama.init(autoreset=True)    def get_archive(self):        print("LogReader")    def get_log_info(self):        # data = self.modbus.read_holding_registers(Log.INFO.value, 19)        data = self.modbus.read_holding_registers(Log.LOG_ENTRYS_MAX.value, 11)        self.log_capacity = data[0]        self.log_entries_number = data[1]        self.archive_capacity = data[2]        self.archive_entries_number = data[3:11]        self.archive_period = self.modbus.read_holding_registers(Log.ARCH_PER_CH1.value, 8)        print(Fore.CYAN + "Log and archive params:\n")        print('Log capacity           :', Fore.CYAN + str(self.log_capacity))        print('Log entries number     :', Fore.CYAN + str(self.log_entries_number))        print('Archive capacity       :', Fore.CYAN + str(self.archive_capacity))        print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))        print('Archive period         :', Fore.CYAN + str(self.archive_period), end='\n')    def set_archive_channel_period(self, channel, value):        """Установить период архивирования в канале 1..12 в секундах"""        self.modbus.write_holding_register(Log.ARCH_PER_CH1.value + channel - 1, value)    def log_clear(self):        """Очистить журнал событий"""        self.modbus.write_holding_register(Log.CMD.value, self.COM_LOG_CLEAR)    def archive_channel_clear(self, channel):        """Очистить архив по каналам 1..12"""        self.modbus.write_holding_register(Log.CMD.value, self.COM_ARCHIVE_CLEAR + channel - 1)class DigitalLogReader(LogReader):        def __init__(self, modbus: Modbus):        super().__init__(modbus)    def get_archive(self):        print("DigitalLogReader")    def get_archive_entry(self, channel, index):        data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)                return struct.unpack('<QBB', data[5:15])            def get_random_archive_entry(self):        data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(0, 8), randint(1, self.archive_entries_number), 1)        entry = struct.unpack('<QBB', data[5:15])        print(Fore.CYAN + str(entry))    def get_log_entry(self, index):        data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)        return struct.unpack('<QBBBfB', data[5:21])    def print_log_entry(self, index):        time_start = time.time()        data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)        print(f"Entry time: ", time.time() - time_start)        # LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)    def print_archive_entry(self, channel, index):        data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)        ArchiveParser.print_entry(struct.unpack('<QBB', data[5:15]), channel, index)    def get_random_log_entry(self):        data = self.modbus.read_file_record(LOG_ENTRY, 0, randint(1, self.log_entries_number), 1)        entry = struct.unpack('<QBBBfB', data[5:21])        print(Fore.CYAN + str(entry))    def get_random_entries(self):        '''Читаем лог и архив'''        print(Fore.CYAN + "\nEntries:\n")        while (1):            self.get_random_archive_entry()            sleep(0.1)            self.get_random_log_entry()            sleep(0.1)    def get_all_archive(self):        self.get_log_info()        for channel in range(0, len(self.archive_entries_number)):            if (self.archive_entries_number[channel]):                print(Fore.LIGHTGREEN_EX + f"Archive channel: {channel}")                for i in range(0, self.archive_entries_number[channel]):                    self.print_archive_entry(channel, i + 1)    def get_all_log(self):        self.get_log_info()        for i in range(1, self.log_entries_number + 1):            self.print_log_entry(i)            # sleep(0.01)class AnalogInputLogReader(LogReader):    def __init__(self, modbus: Modbus):        super().__init__(modbus)    def get_archive(self):        print("AnalogInputLogReader")def main():    serial = Serial('COM5', 115200, timeout=0.05, parity='N', xonxoff=False)    modbus = Modbus(serial, 1)    modbus.MB_DEBUG = False    dio = DigitalLogReader(modbus)        # dio.get_log_info()        # for i in range(1, 9):    #     dio.set_archive_channel_period(i, 10)    # dio.get_log_info()    while True:        dio.get_all_log()    # module.get_archive()    # module.set_archive_period(10)    return    # for i in range(500):    while (1):        module.get_random_archive_entry()        sleep(0.1)        module.get_random_log_entry()        sleep(0.1)if __name__ == '__main__':    main()
 |