log_reader.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from io_module import IO_Module
  2. from io_module import reg_table
  3. from colorama import Fore
  4. from random import randint
  5. from time import sleep
  6. import colorama
  7. import struct
  8. ARCHIVE_ENTRY = 0x06
  9. LOG_ENTRY = 0x07
  10. class LogReader(IO_Module):
  11. def __init__(self, tty: str, brate: int, address: int):
  12. super().__init__(tty, brate, address)
  13. colorama.init(autoreset=True)
  14. self.log_capacity = 0
  15. self.log_entries_number = 0
  16. self.archive_capacity = 0
  17. self.archive_entries_number = 0
  18. def get_archive(self):
  19. print("LogReader")
  20. def get_log_info(self):
  21. data = self.read_holding_registers(reg_table['log_info'], 4)
  22. self.log_capacity = data[0]
  23. self.log_entries_number = data[1]
  24. self.archive_capacity = data[2]
  25. self.archive_entries_number = data[3]
  26. print('Log capacity :', Fore.CYAN + str(self.log_capacity))
  27. print('Log entries number :', Fore.CYAN + str(self.log_entries_number))
  28. print('Archive capacity :', Fore.CYAN + str(self.archive_capacity))
  29. print('Archive entries number :', Fore.CYAN + str(self.archive_entries_number))
  30. class DigitalLogReader(LogReader):
  31. def __init__(self, tty: str, brate: int, address: int):
  32. super().__init__(tty, brate, address)
  33. def get_archive(self):
  34. print("DigitalLogReader")
  35. def get_random_archive_entry(self):
  36. data = self.read_file_record(ARCHIVE_ENTRY, randint(1, self.archive_entries_number), 1)
  37. entry = struct.unpack('<QBB', data[5:15])
  38. print(entry)
  39. def get_random_log_entry(self):
  40. data = self.read_file_record(LOG_ENTRY, randint(1, self.log_entries_number), 1)
  41. entry = struct.unpack('<QBBBfB', data[5:21])
  42. print(entry)
  43. class AnalogInputLogReader(LogReader):
  44. def __init__(self):
  45. super().__init__()
  46. def get_archive(self):
  47. print("AnalogInputLogReader")
  48. def main():
  49. module = DigitalLogReader('COM24', 115200, 15)
  50. module.get_log_info()
  51. module.get_archive()
  52. # for i in range(500):
  53. while (1):
  54. module.get_random_archive_entry()
  55. sleep(0.1)
  56. module.get_random_log_entry()
  57. sleep(0.1)
  58. if __name__ == '__main__':
  59. main()