io_module.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. from modbus import Modbus, MBError, NoResponseError
  2. import colorama
  3. from colorama import Fore
  4. import time
  5. import os
  6. reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120, 'in_norm': 0x0122, 'in_deb_start': 0x124,
  7. 'out_cur': 0x0200, 'out_mode': 0x0202,
  8. 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804, 'uptime': 0x0800,}
  9. class IO_Module(Modbus):
  10. def __init__(self, tty: str, brate: int, address: int):
  11. super().__init__(tty, brate, address)
  12. self.update_segment_number = 0
  13. def iap_start(self):
  14. """Reboot device in IAP mode"""
  15. request = bytes((self.address, 0x41, 0x01))
  16. response = self.raw_communicate(request + self._crc(request))
  17. def write_fw_start(self, size: int):
  18. """Ask device to start update"""
  19. self.update_segment_number = 0
  20. request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  21. response = self.raw_communicate(request + self._crc(request), 5)
  22. if len(response) == 0:
  23. raise NoResponseError('No response on WRITE_START command')
  24. if len(response) != 5:
  25. raise MBError('Incorrect response length')
  26. if response[:3] != bytes((self.address, 0x41, 0x01)):
  27. raise MBError('Incorrect response')
  28. def write_fw_part(self, data: bytes):
  29. """Write piece of FW data in IAP mode"""
  30. header = bytes((self.address, 0x41, 0x02))
  31. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  32. response = self.raw_communicate(request + self._crc(request), 5)
  33. # self.print_hex(response)
  34. if len(response) != 5:
  35. raise MBError('Incorrect response length')
  36. if (response[:3]) != header:
  37. raise MBError('Incorrect response')
  38. self.update_segment_number += 1
  39. def iap_finish(self):
  40. """Complete FW transmission and check response"""
  41. header = request = bytes((self.address, 0x41, 0x03))
  42. response = self.raw_communicate(request + self._crc(request), 5)
  43. if len(response) != 5:
  44. raise MBError('Incorrect response length')
  45. if response[:3] != header:
  46. raise MBError('Incorrect response')
  47. def update(self, path):
  48. self.MB_TIMEOUT = 3
  49. size = os.path.getsize('fw.bin')
  50. print('Switch to IAP mode')
  51. self.iap_start()
  52. time.sleep(4)
  53. print(f'Start writing {size} bytes of FW')
  54. self.write_fw_start(size)
  55. time.sleep(2)
  56. print(f'Open FW file "{path}"...')
  57. with open(path, 'rb') as f:
  58. done = progress_cur = progress_pre = 0
  59. while True:
  60. buf = f.read(128)
  61. if len(buf):
  62. self.write_fw_part(buf)
  63. progress_cur = done / size
  64. else:
  65. break
  66. print('End of transmission')
  67. self.iap_finish()
  68. # 0x0100 - текущее состояние входов
  69. def get_inputs_bit(self) -> str:
  70. data = self.read_holding_registers(reg_table['in_bits'], 1)
  71. return format(data[0], '08b')
  72. # 0x0101 - 0x0110 Счетчики импульсов
  73. def get_inputs_counters(self):
  74. data = []
  75. for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2):
  76. data.append(self.read_uint32_holding(i))
  77. return data
  78. # 0x0120 - режим работы входов
  79. def get_inputs_mode(self):
  80. data = self.read_holding_registers(reg_table['in_mode'], 1)
  81. return format(data[0], '08b')
  82. def set_inputs_mode(self, val):
  83. self.write_holding_register(reg_table['in_mode'], val)
  84. #
  85. def set_input_mode(self, input, val):
  86. ret = self.read_holding_registers(reg_table['in_mode'], 1)
  87. if val == 1:
  88. data = ret[0] | (0b1 << (input - 1))
  89. else:
  90. data = ret[0] & ~(0b1 << (input - 1))
  91. self.set_inputs_mode(data)
  92. # 0x0122 - нормальное состояние входов
  93. def get_inputs_norm_state(self):
  94. data = self.read_holding_registers(reg_table['in_norm'], 1)
  95. return format(data[0], '08b')
  96. def set_inputs_norm_state(self, val):
  97. self.write_holding_register(reg_table['in_norm'], val)
  98. # 0x0124 - время антидребезга (ms)
  99. def get_debounce_channel(self, input):
  100. data = self.read_holding_registers(reg_table['in_deb_start'] + input - 1, 1)
  101. return data[0]
  102. def set_debounce_channel(self, input, val):
  103. self.write_holding_register(reg_table['in_deb_start'] + input - 1, val)
  104. # 0x0200 - текущее состояние выходов в обычно режиме
  105. def get_outputs(self):
  106. data = self.read_holding_registers(reg_table['out_cur'], 1)
  107. return format(data[0], '08b')
  108. # 0x0200 - текущее состояние выходов в обычно режиме
  109. def set_outputs(self, val):
  110. self.write_holding_register(reg_table['out_cur'], val)
  111. def set_output(self, output, val):
  112. ret = self.read_holding_registers(reg_table['out_cur'], 1)
  113. if val == 1:
  114. data = ret[0] | (0b1 << (output - 1))
  115. else:
  116. data = ret[0] & ~(0b1 << (output - 1))
  117. self.set_outputs(data)
  118. # 0x0202 - режим работы выходов; 0 - обычный, 1 - ШИМ
  119. def get_outputs_mode(self):
  120. data = self.read_holding_registers(reg_table['out_mode'], 1)
  121. return format(data[0], '08b')
  122. def set_outputs_mode(self, val):
  123. self.write_holding_register(reg_table['out_mode'], val)
  124. def set_output_mode(self, output, val):
  125. ret = self.read_holding_registers(reg_table['out_mode'], 1)
  126. if val == 1:
  127. data = ret[0] | (0b1 << (output - 1))
  128. else:
  129. data = ret[0] & ~(0b1 << (output - 1))
  130. self.set_outputs_mode(data)
  131. def get_uptime(self):
  132. return self.read_uint32_holding(reg_table['uptime'])
  133. def get_rtc(self):
  134. return self.read_uint32_holding(reg_table['rtc_unix'])
  135. def set_rtc(self, utc):
  136. self.write_uint32(reg_table['rtc_sinhro'], utc)
  137. def main():
  138. colorama.init(autoreset=True)
  139. dev = IO_Module('COM24', 115200, 14)
  140. dev.MB_DEBUG = False
  141. # dev.update('fw.bin')
  142. # Запрос системных параметров, установка времени
  143. # print('Device uptime:', dev.get_uptime())
  144. # unix_time = dev.get_rtc()
  145. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  146. # print('Set time:', int(time.time()))
  147. # dev.set_rtc(int(time.time()))
  148. # time.sleep(1)
  149. # unix_time = dev.get_rtc()
  150. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  151. for i in range(1, 9):
  152. dev.set_input_mode(i, 1)
  153. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  154. for i in range(1, 9):
  155. dev.set_debounce_channel(i, 50 + i)
  156. dev.set_output(i, 0)
  157. # Установить 1-ый выход в режим PWM
  158. dev.set_output_mode(1, 1)
  159. # Установить нормальное состояние входов
  160. # dev.set_inputs_norm_state(0b00110101)
  161. while True:
  162. print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
  163. # Значения входов (битовое поле)
  164. print('Inputs values [bit field] :', Fore.GREEN + dev.get_inputs_bit())
  165. # Режим работы входов (битовое поле)
  166. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  167. # Нормальное состояние входов (битовое поле)
  168. print('Inputs norm [bit field] :', Fore.GREEN + dev.get_inputs_norm_state())
  169. # Период антидребезга (ms)
  170. for i in range(1, 9):
  171. print(f'Debounce input {i} (ms) :', Fore.GREEN + str(dev.get_debounce_channel(i)))
  172. # Значение счетчиков
  173. data = dev.get_inputs_counters()
  174. print('Inputs counters :', Fore.GREEN + ' | '.join(str(el) for el in data))
  175. # Текущее состояние выходов в обычном режиме
  176. print('Outputs norm [bit field] :', Fore.GREEN + dev.get_outputs())
  177. # # Для проверки выходов в обычном режиме
  178. # for i in range(1, 9):
  179. # dev.set_output(i, 1)
  180. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  181. # time.sleep(0.1)
  182. # for i in range(1, 9):
  183. # dev.set_output(i, 0)
  184. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  185. # time.sleep(0.1)
  186. # # Режим работы выходов
  187. # for i in range(1, 9):
  188. # dev.set_output_mode(i, 1)
  189. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  190. # time.sleep(0.1)
  191. # for i in range(1, 9):
  192. # dev.set_output_mode(i, 0)
  193. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  194. # time.sleep(0.1)
  195. # break
  196. time.sleep(1)
  197. # for i in range(1, 9):
  198. # dev.set_input_mode(i, 1)
  199. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  200. # for i in range(8, 0, -1):
  201. # dev.set_input_mode(i, 0)
  202. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  203. if __name__ == '__main__':
  204. main()