123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- from modbus import Modbus
- from mb_registers import reg_table, LOG_REGS
- from colorama import Fore
- from random import randint
- from time import sleep
- import time
- import colorama
- import struct
- from datetime import datetime, timedelta, timezone
- ARCHIVE_ENTRY = 0x06
- LOG_ENTRY = 0x07
- class Parser:
- utc_offset = 10800
- def __init__(self) -> None:
- t = datetime.now(timezone.utc).astimezone()
- self.utc_offset = t.utcoffset() // timedelta(seconds=1)
- class LogParser(Parser):
- events = {1: 'Включение питания/перезагрузка ',
- 2: 'Перевод времени ',
- 3: 'Обновление ПО ',
- 4: 'Самодиагностика/системная ошибка',
- 5: 'Изменение конфигурации ',
- 6: 'Диагностика выходов ',
- 7: 'Срабатывание уставок ',
- 8: 'Переход в безопасный режим ',
- 9: 'Очистка журнала/архива '}
- @staticmethod
- def print_entry(entry: tuple, index: int):
- timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
- ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}"
- print(Fore.CYAN + ans)
- class ArchiveParser(Parser):
- @staticmethod
- def print_entry(entry: tuple, channel: int, index: int):
- timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
- ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}"
- print(Fore.CYAN + ans)
- class LogReader:
- COM_LOG_CLEAR = 0x0002
- COM_ARCHIVE_CLEAR = 0x0003
- def __init__(self, modbus: Modbus):
- self.modbus = modbus
- self.log_capacity = 0
- self.log_entries_number = 0
- self.archive_capacity = 0
- self.archive_entries_number = 0
- self.archive_period = []
- colorama.init(autoreset=True)
- def get_archive(self):
- print("LogReader")
- def get_log_info(self):
- data = self.modbus.read_holding_registers(reg_table['log_info'], 19)
- self.log_capacity = data[0]
- self.log_entries_number = data[1]
- self.archive_capacity = data[2]
- self.archive_entries_number = data[3:11]
- self.archive_period = data[12:21]
- print(Fore.CYAN + "Log and archive params:\n")
- print('Log capacity :', Fore.CYAN + str(self.log_capacity))
- print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
- print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
- print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
- print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n')
- def set_archive_period(self, value, channel):
- self.modbus.write_holding_register(LOG_REGS['arch_per'] + channel, value)
- def log_clear(self):
- self.modbus.write_holding_register(reg_table['param_manager'], self.COM_LOG_CLEAR)
- def archive_clear(self):
- self.modbus.write_holding_register(reg_table['param_manager'], self.COM_ARCHIVE_CLEAR)
- class DigitalLogReader(LogReader):
-
- def __init__(self, modbus: Modbus):
- super().__init__(modbus)
- def get_archive(self):
- print("DigitalLogReader")
- def get_archive_entry(self, channel, index):
- data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
- return struct.unpack('<QBB', data[5:15])
-
- def get_random_archive_entry(self):
- data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(0, 8), randint(1, self.archive_entries_number), 1)
- entry = struct.unpack('<QBB', data[5:15])
- print(Fore.CYAN + str(entry))
- def get_log_entry(self, index):
- data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
- return struct.unpack('<QBBBfB', data[5:21])
- def print_log_entry(self, index):
- data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
- LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
- def print_archive_entry(self, channel, index):
- data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
- ArchiveParser.print_entry(struct.unpack('<QBB', data[5:15]), channel, index)
- def get_random_log_entry(self):
- data = self.modbus.read_file_record(LOG_ENTRY, 0, randint(1, self.log_entries_number), 1)
- entry = struct.unpack('<QBBBfB', data[5:21])
- print(Fore.CYAN + str(entry))
- def get_random_entries(self):
- '''Читаем лог и архив'''
- print(Fore.CYAN + "\nEntries:\n")
- while (1):
- self.get_random_archive_entry()
- sleep(0.1)
- self.get_random_log_entry()
- sleep(0.1)
- def get_all_archive(self):
- self.get_log_info()
- for channel in range(0, len(self.archive_entries_number)):
- if (self.archive_entries_number[channel]):
- print(Fore.LIGHTGREEN_EX + f"Archive channel: {channel}")
- for i in range(0, self.archive_entries_number[channel]):
- self.print_archive_entry(channel, i + 1)
- def get_all_log(self):
- self.get_log_info()
- for i in range(1, self.log_entries_number + 1):
- self.print_log_entry(i)
- sleep(0.01)
- class AnalogInputLogReader(LogReader):
- def __init__(self, modbus: Modbus):
- super().__init__(modbus)
- def get_archive(self):
- print("AnalogInputLogReader")
- def main():
- module = DigitalLogReader('COM24', 115200, 15)
- # module.get_log_info()
- # module.get_archive()
- # module.set_archive_period(10)
- return
- # for i in range(500):
- while (1):
- module.get_random_archive_entry()
- sleep(0.1)
- module.get_random_log_entry()
- sleep(0.1)
- if __name__ == '__main__':
- main()
|