log_reader.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from modbus import Modbus
  2. from mb_registers import reg_table
  3. from colorama import Fore
  4. from random import randint
  5. from time import sleep
  6. import colorama
  7. import struct
  8. ARCHIVE_ENTRY = 0x06
  9. LOG_ENTRY = 0x07
  10. class LogReader():
  11. def __init__(self, modbus: Modbus):
  12. self.modbus = modbus
  13. self.log_capacity = 0
  14. self.log_entries_number = 0
  15. self.archive_capacity = 0
  16. self.archive_entries_number = 0
  17. self.archive_period = 0
  18. colorama.init(autoreset=True)
  19. def get_archive(self):
  20. print("LogReader")
  21. def get_log_info(self):
  22. data = self.modbus.read_holding_registers(reg_table['log_info'], 5)
  23. self.log_capacity = data[0]
  24. self.log_entries_number = data[1]
  25. self.archive_capacity = data[2]
  26. self.archive_entries_number = data[3]
  27. self.archive_period = data[4]
  28. print(Fore.CYAN + "Log and archive params:\n")
  29. print('Log capacity :', Fore.CYAN + str(self.log_capacity))
  30. print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
  31. print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
  32. print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
  33. print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n')
  34. def set_archive_period(self, value):
  35. self.modbus.write_holding_register(reg_table['archive_per'], value)
  36. class DigitalLogReader(LogReader):
  37. def __init__(self, modbus: Modbus):
  38. super().__init__(modbus)
  39. def get_archive(self):
  40. print("DigitalLogReader")
  41. def get_archive_entry(self, index):
  42. data = self.modbus.read_file_record(ARCHIVE_ENTRY, index, 1)
  43. return struct.unpack('<QBB', data[5:15])
  44. def get_random_archive_entry(self):
  45. data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(1, self.archive_entries_number), 1)
  46. entry = struct.unpack('<QBB', data[5:15])
  47. print(Fore.CYAN + str(entry))
  48. def get_random_log_entry(self):
  49. data = self.modbus.read_file_record(LOG_ENTRY, randint(1, self.log_entries_number), 1)
  50. entry = struct.unpack('<QBBBfB', data[5:21])
  51. print(Fore.CYAN + str(entry))
  52. def get_random_entries(self):
  53. '''Читаем лог и архив'''
  54. print(Fore.CYAN + "\nEntries:\n")
  55. while (1):
  56. self.get_random_archive_entry()
  57. sleep(0.1)
  58. self.get_random_log_entry()
  59. sleep(0.1)
  60. def get_all_archive(self):
  61. self.get_log_info()
  62. for i in range(1, self.archive_entries_number + 1):
  63. print(self.get_archive_entry(i))
  64. sleep(0.1)
  65. class AnalogInputLogReader(LogReader):
  66. def __init__(self):
  67. super().__init__()
  68. def get_archive(self):
  69. print("AnalogInputLogReader")
  70. def main():
  71. module = DigitalLogReader('COM24', 115200, 15)
  72. # module.get_log_info()
  73. # module.get_archive()
  74. # module.set_archive_period(10)
  75. return
  76. # for i in range(500):
  77. while (1):
  78. module.get_random_archive_entry()
  79. sleep(0.1)
  80. module.get_random_log_entry()
  81. sleep(0.1)
  82. if __name__ == '__main__':
  83. main()