log_reader.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from modbus import Modbus
  2. from mb_registers import reg_table
  3. from colorama import Fore
  4. from random import randint
  5. from time import sleep
  6. import colorama
  7. import struct
  8. ARCHIVE_ENTRY = 0x06
  9. LOG_ENTRY = 0x07
  10. class LogReader():
  11. def __init__(self, modbus: Modbus):
  12. self.modbus = modbus
  13. self.log_capacity = 0
  14. self.log_entries_number = 0
  15. self.archive_capacity = 0
  16. self.archive_entries_number = 0
  17. self.archive_period = 0
  18. colorama.init(autoreset=True)
  19. def get_archive(self):
  20. print("LogReader")
  21. def get_log_info(self):
  22. data = self.modbus.read_holding_registers(reg_table['log_info'], 5)
  23. self.log_capacity = data[0]
  24. self.log_entries_number = data[1]
  25. self.archive_capacity = data[2]
  26. self.archive_entries_number = data[3]
  27. self.archive_period = data[4]
  28. print(Fore.CYAN + "Log and archive params:\n")
  29. print('Log capacity :', Fore.CYAN + str(self.log_capacity))
  30. print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
  31. print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
  32. print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
  33. print('Archive period :', Fore.CYAN + str(self.archive_period), end='\n')
  34. def set_archive_period(self, value):
  35. self.modbus.write_holding_register(reg_table['archive_per'], value)
  36. class DigitalLogReader(LogReader):
  37. def __init__(self, modbus: Modbus):
  38. super().__init__(modbus)
  39. def get_archive(self):
  40. print("DigitalLogReader")
  41. def get_random_archive_entry(self):
  42. data = self.modbus.read_file_record(ARCHIVE_ENTRY, randint(1, self.archive_entries_number), 1)
  43. entry = struct.unpack('<QBB', data[5:15])
  44. print(Fore.CYAN + str(entry))
  45. def get_random_log_entry(self):
  46. data = self.modbus.read_file_record(LOG_ENTRY, randint(1, self.log_entries_number), 1)
  47. entry = struct.unpack('<QBBBfB', data[5:21])
  48. print(Fore.CYAN + str(entry))
  49. def get_random_entries(self):
  50. '''Читаем лог и архив'''
  51. print(Fore.CYAN + "\nEntries:\n")
  52. while (1):
  53. self.get_random_archive_entry()
  54. sleep(0.1)
  55. self.get_random_log_entry()
  56. sleep(0.1)
  57. class AnalogInputLogReader(LogReader):
  58. def __init__(self):
  59. super().__init__()
  60. def get_archive(self):
  61. print("AnalogInputLogReader")
  62. def main():
  63. module = DigitalLogReader('COM24', 115200, 15)
  64. module.get_log_info()
  65. # module.get_archive()
  66. # module.set_archive_period(10)
  67. return
  68. # for i in range(500):
  69. while (1):
  70. module.get_random_archive_entry()
  71. sleep(0.1)
  72. module.get_random_log_entry()
  73. sleep(0.1)
  74. if __name__ == '__main__':
  75. main()