log_reader.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. from modbus import Modbus
  2. from mb_registers import reg_table, LOG_REGS, Log
  3. from colorama import Fore
  4. from random import randint
  5. from time import sleep
  6. import time
  7. import colorama
  8. import struct
  9. from datetime import datetime, timedelta, timezone
  10. from serial import Serial
  11. ARCHIVE_ENTRY = 0x06
  12. LOG_ENTRY = 0x07
  13. class Parser:
  14. utc_offset = 10800
  15. def __init__(self) -> None:
  16. t = datetime.now(timezone.utc).astimezone()
  17. self.utc_offset = t.utcoffset() // timedelta(seconds=1)
  18. class LogParser(Parser):
  19. events = {1: 'Включение питания/перезагрузка ',
  20. 2: 'Перевод времени ',
  21. 3: 'Обновление ПО ',
  22. 4: 'Самодиагностика/системная ошибка',
  23. 5: 'Изменение конфигурации ',
  24. 6: 'Диагностика выходов ',
  25. 7: 'Срабатывание уставок ',
  26. 8: 'Переход в безопасный режим ',
  27. 9: 'Очистка журнала/архива '}
  28. @staticmethod
  29. def print_entry(entry: tuple, index: int):
  30. timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
  31. ans = f"[LOG] {index:05}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}"
  32. print(Fore.CYAN + ans)
  33. class ArchiveParser(Parser):
  34. @staticmethod
  35. def print_entry(entry: tuple, channel: int, index: int):
  36. timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
  37. ans = f"[ARCHIVE] {index:05}: {timestamp}, value: {entry[1]}"
  38. print(Fore.CYAN + ans)
  39. class LogReader:
  40. COM_LOG_CLEAR = 0x0002
  41. COM_ARCHIVE_CLEAR = 0x0003
  42. def __init__(self, modbus: Modbus):
  43. self.modbus = modbus
  44. self.log_capacity = 0
  45. self.log_entries_number = 0
  46. self.archive_capacity = 0
  47. self.archive_entries_number = 0
  48. self.archive_period = []
  49. colorama.init(autoreset=True)
  50. def get_archive(self):
  51. print("LogReader")
  52. def get_log_info(self):
  53. # data = self.modbus.read_holding_registers(Log.INFO.value, 19)
  54. data = self.modbus.read_holding_registers(Log.LOG_ENTRYS_MAX.value, 11)
  55. self.log_capacity = data[0]
  56. self.log_entries_number = data[1]
  57. self.archive_capacity = data[2]
  58. self.archive_entries_number = data[3:11]
  59. self.archive_period = self.modbus.read_holding_registers(Log.ARCH_PER_CH1.value, 8)
  60. print(Fore.CYAN + "Log and archive params:\n")
  61. print('Log capacity :', Fore.CYAN + str(self.log_capacity))
  62. print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
  63. print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
  64. print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
  65. print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n')
  66. def set_archive_channel_period(self, channel, value):
  67. """Установить период архивирования в канале 1..12 в секундах"""
  68. self.modbus.write_holding_register(Log.ARCH_PER_CH1.value + channel - 1, value)
  69. def log_clear(self):
  70. """Очистить журнал событий"""
  71. self.modbus.write_holding_register(Log.CMD.value, self.COM_LOG_CLEAR)
  72. def archive_channel_clear(self, channel):
  73. """Очистить архив по каналам 1..12"""
  74. self.modbus.write_holding_register(Log.CMD.value, self.COM_ARCHIVE_CLEAR + channel - 1)
  75. class DigitalLogReader(LogReader):
  76. def __init__(self, modbus: Modbus):
  77. super().__init__(modbus)
  78. def get_archive(self):
  79. print("DigitalLogReader")
  80. def get_archive_entry(self, channel, index):
  81. data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
  82. return struct.unpack('<QBB', data[5:15])
  83. def get_random_archive_entry(self):
  84. data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(0, 8), randint(1, self.archive_entries_number), 1)
  85. entry = struct.unpack('<QBB', data[5:15])
  86. print(Fore.CYAN + str(entry))
  87. def get_log_entry(self, index):
  88. data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
  89. return struct.unpack('<QBBBfB', data[5:21])
  90. def print_log_entry(self, index):
  91. time_start = time.time()
  92. data = self.modbus.read_file_record(LOG_ENTRY, 0, index, 1)
  93. print(f"Entry time: ", time.time() - time_start)
  94. # LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
  95. def print_archive_entry(self, channel, index):
  96. data = self.modbus.read_file_record(ARCHIVE_ENTRY, channel, index, 1)
  97. ArchiveParser.print_entry(struct.unpack('<QBB', data[5:15]), channel, index)
  98. def get_random_log_entry(self):
  99. data = self.modbus.read_file_record(LOG_ENTRY, 0, randint(1, self.log_entries_number), 1)
  100. entry = struct.unpack('<QBBBfB', data[5:21])
  101. print(Fore.CYAN + str(entry))
  102. def get_random_entries(self):
  103. '''Читаем лог и архив'''
  104. print(Fore.CYAN + "\nEntries:\n")
  105. while (1):
  106. self.get_random_archive_entry()
  107. sleep(0.1)
  108. self.get_random_log_entry()
  109. sleep(0.1)
  110. def get_all_archive(self):
  111. self.get_log_info()
  112. for channel in range(0, len(self.archive_entries_number)):
  113. if (self.archive_entries_number[channel]):
  114. print(Fore.LIGHTGREEN_EX + f"Archive channel: {channel}")
  115. for i in range(0, self.archive_entries_number[channel]):
  116. self.print_archive_entry(channel, i + 1)
  117. def get_all_log(self):
  118. self.get_log_info()
  119. for i in range(1, self.log_entries_number + 1):
  120. self.print_log_entry(i)
  121. # sleep(0.01)
  122. class AnalogInputLogReader(LogReader):
  123. def __init__(self, modbus: Modbus):
  124. super().__init__(modbus)
  125. def get_archive(self):
  126. print("AnalogInputLogReader")
  127. def main():
  128. serial = Serial('COM5', 115200, timeout=0.05, parity='N', xonxoff=False)
  129. modbus = Modbus(serial, 1)
  130. modbus.MB_DEBUG = False
  131. dio = DigitalLogReader(modbus)
  132. # dio.get_log_info()
  133. # for i in range(1, 9):
  134. # dio.set_archive_channel_period(i, 10)
  135. # dio.get_log_info()
  136. while True:
  137. dio.get_all_log()
  138. # module.get_archive()
  139. # module.set_archive_period(10)
  140. return
  141. # for i in range(500):
  142. while (1):
  143. module.get_random_archive_entry()
  144. sleep(0.1)
  145. module.get_random_log_entry()
  146. sleep(0.1)
  147. if __name__ == '__main__':
  148. main()