| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 | 
							- from modbus import Modbus
 
- from mb_registers import reg_table, LOG_REGS
 
- from colorama import Fore
 
- from random import randint
 
- from time import sleep
 
- import time
 
- import colorama
 
- import struct
 
- from datetime import datetime, timedelta, timezone
 
- ARCHIVE_ENTRY = 0x06
 
- LOG_ENTRY = 0x07
 
- class Parser:
 
-     utc_offset = 10800
 
-     def __init__(self) -> None:
 
-         t = datetime.now(timezone.utc).astimezone()
 
-         self.utc_offset = t.utcoffset() // timedelta(seconds=1)
 
- class LogParser(Parser):
 
-     events = {1: 'Включение питания/перезагрузка  ',
 
-               2: 'Перевод времени                 ',
 
-               3: 'Обновление ПО                   ',
 
-               4: 'Самодиагностика/системная ошибка',
 
-               5: 'Изменение конфигурации          ',
 
-               6: 'Диагностика выходов             ',
 
-               7: 'Срабатывание уставок            ',
 
-               8: 'Переход в безопасный режим      ',
 
-               9: 'Очистка журнала/архива          '}
 
-     @staticmethod
 
-     def print_entry(entry: tuple, index: int):
 
-         timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
 
-         ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}"
 
-         print(Fore.CYAN + ans)
 
- class ArchiveParser(Parser):
 
-     @staticmethod
 
-     def print_entry(entry: tuple, channel: int, index: int):
 
-         timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
 
-         ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}"
 
-         print(Fore.CYAN + ans)
 
- class LogReader:
 
-     COM_LOG_CLEAR = 0x0002
 
-     COM_ARCHIVE_CLEAR = 0x0003
 
-     def __init__(self, modbus: Modbus):
 
-         self.modbus = modbus
 
-         self.log_capacity = 0
 
-         self.log_entries_number = 0
 
-         self.archive_capacity = 0
 
-         self.archive_entries_number = 0    
 
-         self.archive_period = []
 
-         colorama.init(autoreset=True)
 
-     def get_archive(self):
 
-         print("LogReader")
 
-     def get_log_info(self):
 
-         data = self.modbus.read_holding_registers(reg_table['log_info'], 19)
 
-         self.log_capacity = data[0]
 
-         self.log_entries_number = data[1]
 
-         self.archive_capacity = data[2]
 
-         self.archive_entries_number = data[3:11]
 
-         self.archive_period = data[12:21]
 
-         print(Fore.CYAN + "Log and archive params:\n")
 
-         print('Log capacity           :', Fore.CYAN + str(self.log_capacity))
 
-         print('Log entries number     :', Fore.CYAN + str(self.log_entries_number))
 
-         print('Archive capacity       :', Fore.CYAN + str(self.archive_capacity))
 
-         print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
 
-         print('Archive period         :', Fore.CYAN + str(self.archive_period), end='\n')
 
-     def set_archive_period(self, value, channel):
 
-         self.modbus.write_holding_register(LOG_REGS['arch_per'] + channel, value)
 
-     def log_clear(self):
 
-         self.modbus.write_holding_register(reg_table['param_manager'], self.COM_LOG_CLEAR)
 
-     def archive_clear(self):
 
-         self.modbus.write_holding_register(reg_table['param_manager'], self.COM_ARCHIVE_CLEAR)
 
- class DigitalLogReader(LogReader):
 
-     
 
-     def __init__(self, modbus: Modbus):
 
-         super().__init__(modbus)
 
-     def get_archive(self):
 
-         print("DigitalLogReader")
 
-     def get_archive_entry(self, channel, index):
 
-         data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)        
 
-         return struct.unpack('<QBB', data[5:15])
 
-         
 
-     def get_random_archive_entry(self):
 
-         data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(0, 8), randint(1, self.archive_entries_number), 1)
 
-         entry = struct.unpack('<QBB', data[5:15])
 
-         print(Fore.CYAN + str(entry))
 
-     def get_log_entry(self, index):
 
-         data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
 
-         return struct.unpack('<QBBBfB', data[5:21])
 
-     def print_log_entry(self, index):
 
-         data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
 
-         LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
 
-     def print_archive_entry(self, channel, index):
 
-         data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
 
-         ArchiveParser.print_entry(struct.unpack('<QBB', data[5:15]), channel, index)
 
-     def get_random_log_entry(self):
 
-         data = self.modbus.read_file_record(LOG_ENTRY, 0, randint(1, self.log_entries_number), 1)
 
-         entry = struct.unpack('<QBBBfB', data[5:21])
 
-         print(Fore.CYAN + str(entry))
 
-     def get_random_entries(self):
 
-         '''Читаем лог и архив'''
 
-         print(Fore.CYAN + "\nEntries:\n")
 
-         while (1):
 
-             self.get_random_archive_entry()
 
-             sleep(0.1)
 
-             self.get_random_log_entry()
 
-             sleep(0.1)
 
-     def get_all_archive(self):
 
-         self.get_log_info()
 
-         for channel in range(0, len(self.archive_entries_number)):
 
-             if (self.archive_entries_number[channel]):
 
-                 print(Fore.LIGHTGREEN_EX + f"Archive channel: {channel}")
 
-                 for i in range(0, self.archive_entries_number[channel]):
 
-                     self.print_archive_entry(channel, i + 1)
 
-     def get_all_log(self):
 
-         self.get_log_info()
 
-         for i in range(1, self.log_entries_number + 1):
 
-             self.print_log_entry(i)
 
-             sleep(0.01)
 
- class AnalogInputLogReader(LogReader):
 
-     def __init__(self, modbus: Modbus):
 
-         super().__init__(modbus)
 
-     def get_archive(self):
 
-         print("AnalogInputLogReader")
 
- def main():
 
-     module = DigitalLogReader('COM24', 115200, 15)
 
-     # module.get_log_info()
 
-     # module.get_archive()
 
-     # module.set_archive_period(10)
 
-     return
 
-     # for i in range(500):
 
-     while (1):
 
-         module.get_random_archive_entry()
 
-         sleep(0.1)
 
-         module.get_random_log_entry()
 
-         sleep(0.1)
 
- if __name__ == '__main__':
 
-     main()
 
 
  |