log_reader.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from modbus import Modbus
  2. from mb_registers import reg_table
  3. from colorama import Fore
  4. from random import randint
  5. from time import sleep
  6. import time
  7. import colorama
  8. import struct
  9. from datetime import datetime, timedelta, timezone
  10. ARCHIVE_ENTRY = 0x06
  11. LOG_ENTRY = 0x07
  12. class LogParser:
  13. events = {1: 'Включение питания/перезагрузка',
  14. 2: 'Перевод времени',
  15. 3: 'Обновление ПО'}
  16. utc_offset = 10800
  17. def __init__(self) -> None:
  18. t = datetime.now(timezone.utc).astimezone()
  19. self.utc_offset = t.utcoffset() // timedelta(seconds=1)
  20. @staticmethod
  21. def print_entry(entry: tuple, index: int):
  22. timestamp = time.ctime(entry[0]/1000 - LogParser.utc_offset)
  23. ans = f"[LOG] {index}: {LogParser.events[entry[1]]}, {timestamp}, state: {entry[2]}, channel: {entry[3]}, value: {entry[4]}"
  24. print(Fore.CYAN + ans)
  25. class LogReader:
  26. def __init__(self, modbus: Modbus):
  27. self.modbus = modbus
  28. self.log_capacity = 0
  29. self.log_entries_number = 0
  30. self.archive_capacity = 0
  31. self.archive_entries_number = 0
  32. self.archive_period = 0
  33. colorama.init(autoreset=True)
  34. def get_archive(self):
  35. print("LogReader")
  36. def get_log_info(self):
  37. data = self.modbus.read_holding_registers(reg_table['log_info'], 5)
  38. self.log_capacity = data[0]
  39. self.log_entries_number = data[1]
  40. self.archive_capacity = data[2]
  41. self.archive_entries_number = data[3]
  42. self.archive_period = data[4]
  43. print(Fore.CYAN + "Log and archive params:\n")
  44. print('Log capacity :', Fore.CYAN + str(self.log_capacity))
  45. print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
  46. print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
  47. print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
  48. print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n')
  49. def set_archive_period(self, value):
  50. self.modbus.write_holding_register(reg_table['archive_per'], value)
  51. class DigitalLogReader(LogReader):
  52. def __init__(self, modbus: Modbus):
  53. super().__init__(modbus)
  54. def get_archive(self):
  55. print("DigitalLogReader")
  56. def get_archive_entry(self, index):
  57. data = self.modbus.read_file_record(ARCHIVE_ENTRY, index, 1)
  58. return struct.unpack('<QBB', data[5:15])
  59. def get_random_archive_entry(self):
  60. data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(1, self.archive_entries_number), 1)
  61. entry = struct.unpack('<QBB', data[5:15])
  62. print(Fore.CYAN + str(entry))
  63. def get_log_entry(self, index):
  64. data = self.modbus.read_file_record(LOG_ENTRY, index, 1)
  65. return struct.unpack('<QBBBfB', data[5:21])
  66. def print_log_entry(self, index):
  67. data = self.modbus.read_file_record(LOG_ENTRY, index, 1)
  68. LogParser.print_entry(struct.unpack('<QBBBfB', data[5:21]), index)
  69. def get_random_log_entry(self):
  70. data = self.modbus.read_file_record(LOG_ENTRY, randint(1, self.log_entries_number), 1)
  71. entry = struct.unpack('<QBBBfB', data[5:21])
  72. print(Fore.CYAN + str(entry))
  73. def get_random_entries(self):
  74. '''Читаем лог и архив'''
  75. print(Fore.CYAN + "\nEntries:\n")
  76. while (1):
  77. self.get_random_archive_entry()
  78. sleep(0.1)
  79. self.get_random_log_entry()
  80. sleep(0.1)
  81. def get_all_archive(self):
  82. self.get_log_info()
  83. for i in range(1, self.archive_entries_number + 1):
  84. print(self.get_archive_entry(i))
  85. def get_all_log(self):
  86. self.get_log_info()
  87. for i in range(1, self.log_entries_number + 1):
  88. self.print_log_entry(i)
  89. sleep(0.01)
  90. class AnalogInputLogReader(LogReader):
  91. def __init__(self):
  92. super().__init__()
  93. def get_archive(self):
  94. print("AnalogInputLogReader")
  95. def main():
  96. module = DigitalLogReader('COM24', 115200, 15)
  97. # module.get_log_info()
  98. # module.get_archive()
  99. # module.set_archive_period(10)
  100. return
  101. # for i in range(500):
  102. while (1):
  103. module.get_random_archive_entry()
  104. sleep(0.1)
  105. module.get_random_log_entry()
  106. sleep(0.1)
  107. if __name__ == '__main__':
  108. main()