|
@@ -12,8 +12,17 @@ ARCHIVE_ENTRY = 0x06
|
|
LOG_ENTRY = 0x07
|
|
LOG_ENTRY = 0x07
|
|
|
|
|
|
|
|
|
|
|
|
+class Parser:
|
|
|
|
+
|
|
|
|
+ utc_offset = 10800
|
|
|
|
+
|
|
|
|
+ def __init__(self) -> None:
|
|
|
|
+ t = datetime.now(timezone.utc).astimezone()
|
|
|
|
+ self.utc_offset = t.utcoffset() // timedelta(seconds=1)
|
|
|
|
|
|
-class LogParser:
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class LogParser(Parser):
|
|
|
|
|
|
events = {1: 'Включение питания/перезагрузка ',
|
|
events = {1: 'Включение питания/перезагрузка ',
|
|
2: 'Перевод времени ',
|
|
2: 'Перевод времени ',
|
|
@@ -25,13 +34,6 @@ class LogParser:
|
|
8: 'Переход в безопасный режим ',
|
|
8: 'Переход в безопасный режим ',
|
|
9: 'Очистка журнала/архива '}
|
|
9: 'Очистка журнала/архива '}
|
|
|
|
|
|
- utc_offset = 10800
|
|
|
|
-
|
|
|
|
- def __init__(self) -> None:
|
|
|
|
- t = datetime.now(timezone.utc).astimezone()
|
|
|
|
- self.utc_offset = t.utcoffset() // timedelta(seconds=1)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
def print_entry(entry: tuple, index: int):
|
|
def print_entry(entry: tuple, index: int):
|
|
timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
|
|
timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
|
|
@@ -39,6 +41,14 @@ class LogParser:
|
|
print(Fore.CYAN + ans)
|
|
print(Fore.CYAN + ans)
|
|
|
|
|
|
|
|
|
|
|
|
+class ArchiveParser(Parser):
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def print_entry(entry: tuple, index: int):
|
|
|
|
+ timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
|
|
|
|
+ ans = f"[ARCHIVE] {index}: {timestamp}, value: {entry[1]}"
|
|
|
|
+ print(Fore.CYAN + ans)
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
class LogReader:
|
|
class LogReader:
|
|
@@ -92,25 +102,28 @@ class DigitalLogReader(LogReader):
|
|
def get_archive(self):
|
|
def get_archive(self):
|
|
print("DigitalLogReader")
|
|
print("DigitalLogReader")
|
|
|
|
|
|
- def get_archive_entry(self, index):
|
|
|
|
- data = self.modbus.read_file_record(ARCHIVE_ENTRY, index, 1)
|
|
|
|
|
|
+ def get_archive_entry(self, channel, index):
|
|
|
|
+ data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
|
|
return struct.unpack('<QBB', data[5:15])
|
|
return struct.unpack('<QBB', data[5:15])
|
|
|
|
|
|
def get_random_archive_entry(self):
|
|
def get_random_archive_entry(self):
|
|
- data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(1, self.archive_entries_number), 1)
|
|
|
|
|
|
+ data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(0, 8), randint(1, self.archive_entries_number), 1)
|
|
entry = struct.unpack('<QBB', data[5:15])
|
|
entry = struct.unpack('<QBB', data[5:15])
|
|
print(Fore.CYAN + str(entry))
|
|
print(Fore.CYAN + str(entry))
|
|
|
|
|
|
def get_log_entry(self, index):
|
|
def get_log_entry(self, index):
|
|
- data = self.modbus.read_file_record(LOG_ENTRY, index, 1)
|
|
|
|
|
|
+ data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
|
|
return struct.unpack('<QBBBfB', data[5:21])
|
|
return struct.unpack('<QBBBfB', data[5:21])
|
|
|
|
|
|
def print_log_entry(self, index):
|
|
def print_log_entry(self, index):
|
|
- data = self.modbus.read_file_record(LOG_ENTRY, index, 1)
|
|
|
|
|
|
+ data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
|
|
LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
|
|
LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
|
|
|
|
|
|
|
|
+ # def print_archive_entry(self, index):
|
|
|
|
+
|
|
|
|
+
|
|
def get_random_log_entry(self):
|
|
def get_random_log_entry(self):
|
|
- data = self.modbus.read_file_record(LOG_ENTRY, randint(1, self.log_entries_number), 1)
|
|
|
|
|
|
+ data = self.modbus.read_file_record(LOG_ENTRY, 0, randint(1, self.log_entries_number), 1)
|
|
entry = struct.unpack('<QBBBfB', data[5:21])
|
|
entry = struct.unpack('<QBBBfB', data[5:21])
|
|
print(Fore.CYAN + str(entry))
|
|
print(Fore.CYAN + str(entry))
|
|
|
|
|
|
@@ -125,9 +138,14 @@ class DigitalLogReader(LogReader):
|
|
|
|
|
|
def get_all_archive(self):
|
|
def get_all_archive(self):
|
|
self.get_log_info()
|
|
self.get_log_info()
|
|
-
|
|
|
|
- for i in range(1, self.archive_entries_number + 1):
|
|
|
|
- print(self.get_archive_entry(i))
|
|
|
|
|
|
+ # print(self.archive_entries_number)
|
|
|
|
+ for channel in range(0, len(self.archive_entries_number)):
|
|
|
|
+ # print(f"Archive channel {channel} {self.archive_entries_number[channel]}")
|
|
|
|
+ # print(self.archive_entries_number[channel])
|
|
|
|
+ if (self.archive_entries_number[channel]):
|
|
|
|
+ print(f"Archive channel {channel}")
|
|
|
|
+ for i in range(0, self.archive_entries_number[channel]):
|
|
|
|
+ print(self.get_archive_entry(channel, i+1))
|
|
|
|
|
|
def get_all_log(self):
|
|
def get_all_log(self):
|
|
self.get_log_info()
|
|
self.get_log_info()
|