io_module.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. from modbus import Modbus, MBError, NoResponseError
  2. import colorama
  3. from colorama import Fore
  4. import time
  5. import os
  6. import struct
  7. reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120, 'in_norm': 0x0122, 'in_deb_start': 0x124,
  8. 'out_cur': 0x0200, 'out_mode': 0x0202, 'out_mode_save': 0x0203, 'pwm_duty': 0x0210,
  9. 'pwm_duty_save': 0x0220, 'pwm_per': 0x0230, 'pwm_per_save': 0x0240,
  10. 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804, 'uptime': 0x0800, 'log_info': 0x0900,
  11. 'log_ent': 0x0901, 'arch_cap': 0x0902, 'arch_ent': 0x0903}
  12. class IO_Module(Modbus):
  13. def __init__(self, tty: str, brate: int, address: int):
  14. super().__init__(tty, brate, address)
  15. self.update_segment_number = 0
  16. def iap_start(self):
  17. """Reboot device in IAP mode"""
  18. request = bytes((self.address, 0x41, 0x01))
  19. response = self.raw_communicate(request + self._crc(request))
  20. def write_fw_start(self, size: int):
  21. """Ask device to start update"""
  22. self.update_segment_number = 0
  23. request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  24. response = self.raw_communicate(request + self._crc(request), 5)
  25. if len(response) == 0:
  26. raise NoResponseError('No response on WRITE_START command')
  27. if len(response) != 5:
  28. raise MBError('Incorrect response length')
  29. if response[:3] != bytes((self.address, 0x41, 0x01)):
  30. raise MBError('Incorrect response')
  31. def write_fw_part(self, data: bytes):
  32. """Write piece of FW data in IAP mode"""
  33. header = bytes((self.address, 0x41, 0x02))
  34. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  35. response = self.raw_communicate(request + self._crc(request), 5)
  36. # self.print_hex(response)
  37. if len(response) != 5:
  38. raise MBError('Incorrect response length')
  39. if (response[:3]) != header:
  40. raise MBError('Incorrect response')
  41. self.update_segment_number += 1
  42. def iap_finish(self):
  43. """Complete FW transmission and check response"""
  44. header = request = bytes((self.address, 0x41, 0x03))
  45. response = self.raw_communicate(request + self._crc(request), 5)
  46. if len(response) != 5:
  47. raise MBError('Incorrect response length')
  48. if response[:3] != header:
  49. raise MBError('Incorrect response')
  50. def update(self, path):
  51. self.MB_TIMEOUT = 3
  52. size = os.path.getsize('fw.bin')
  53. print('Switch to IAP mode')
  54. self.iap_start()
  55. time.sleep(4)
  56. print(f'Start writing {size} bytes of FW')
  57. self.write_fw_start(size)
  58. time.sleep(2)
  59. print(f'Open FW file "{path}"...')
  60. with open(path, 'rb') as f:
  61. done = progress_cur = progress_pre = 0
  62. while True:
  63. buf = f.read(128)
  64. if len(buf):
  65. self.write_fw_part(buf)
  66. progress_cur = done / size
  67. else:
  68. break
  69. print('End of transmission')
  70. self.iap_finish()
  71. # 0x0100 - текущее состояние входов
  72. def get_inputs_bit(self) -> str:
  73. data = self.read_holding_registers(reg_table['in_bits'], 1)
  74. return format(data[0], '08b')
  75. # 0x0101 - 0x0110 Счетчики импульсов
  76. def get_inputs_counters(self):
  77. data = []
  78. for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2):
  79. data.append(self.read_uint32_holding(i))
  80. return data
  81. # 0x0120 - режим работы входов
  82. def get_inputs_mode(self):
  83. data = self.read_holding_registers(reg_table['in_mode'], 1)
  84. return format(data[0], '08b')
  85. def set_inputs_mode(self, val):
  86. self.write_holding_register(reg_table['in_mode'], val)
  87. #
  88. def set_input_mode(self, input, val):
  89. ret = self.read_holding_registers(reg_table['in_mode'], 1)
  90. if val == 1:
  91. data = ret[0] | (0b1 << (input - 1))
  92. else:
  93. data = ret[0] & ~(0b1 << (input - 1))
  94. self.set_inputs_mode(data)
  95. # 0x0122 - нормальное состояние входов
  96. def get_inputs_norm_state(self):
  97. data = self.read_holding_registers(reg_table['in_norm'], 1)
  98. return format(data[0], '08b')
  99. def set_inputs_norm_state(self, val):
  100. self.write_holding_register(reg_table['in_norm'], val)
  101. # 0x0124 - время антидребезга (ms)
  102. def get_debounce_channel(self, input):
  103. data = self.read_holding_registers(reg_table['in_deb_start'] + input - 1, 1)
  104. return data[0]
  105. def get_debounce_channels(self):
  106. return self.read_holding_registers(reg_table['in_deb_start'], 8)
  107. def set_debounce_channel(self, input, val):
  108. self.write_holding_register(reg_table['in_deb_start'] + input - 1, val)
  109. # 0x0200 - текущее состояние выходов в обычно режиме
  110. def get_outputs(self):
  111. data = self.read_holding_registers(reg_table['out_cur'], 1)
  112. return format(data[0], '08b')
  113. # 0x0200 - текущее состояние выходов в обычно режиме
  114. def set_outputs(self, val):
  115. self.write_holding_register(reg_table['out_cur'], val)
  116. def set_output(self, output, val):
  117. ret = self.read_holding_registers(reg_table['out_cur'], 1)
  118. if val == 1:
  119. data = ret[0] | (0b1 << (output - 1))
  120. else:
  121. data = ret[0] & ~(0b1 << (output - 1))
  122. self.set_outputs(data)
  123. # 0x0202 - режим работы выходов; 0 - обычный, 1 - ШИМ
  124. def get_outputs_mode(self):
  125. data = self.read_holding_registers(reg_table['out_mode'], 1)
  126. return format(data[0], '08b')
  127. def set_outputs_mode(self, val):
  128. self.write_holding_register(reg_table['out_mode'], val)
  129. def set_output_mode(self, output, val):
  130. ret = self.read_holding_registers(reg_table['out_mode'], 1)
  131. if val == 1:
  132. data = ret[0] | (0b1 << (output - 1))
  133. else:
  134. data = ret[0] & ~(0b1 << (output - 1))
  135. self.set_outputs_mode(data)
  136. # 0x0203 - состояние выходов (режим обычного выхода) в безопасном режиме работы
  137. def get_outputs_mode_save(self):
  138. data = self.read_holding_registers(reg_table['out_mode_save'], 1)
  139. return format(data[0], '08b')
  140. def set_outputs_mode_save(self, val):
  141. self.write_holding_register(reg_table['out_mode_save'], val)
  142. def set_output_mode_save(self, output, val):
  143. ret = self.read_holding_registers(reg_table['out_mode_save'], 1)
  144. if val == 1:
  145. data = ret[0] | (0b1 << (output - 1))
  146. else:
  147. data = ret[0] & ~(0b1 << (output - 1))
  148. self.set_outputs_mode_save(data)
  149. # 0x0210 - заполнение PWM (%)
  150. def get_pwm_duty(self, output):
  151. data = self.read_holding_registers(reg_table['pwm_duty'] + output - 1, 1)
  152. return data[0]
  153. def get_pwm_duty_all(self):
  154. return self.read_holding_registers(reg_table['pwm_duty'], 8)
  155. def set_pwm_duty(self, output, val):
  156. self.write_holding_register(reg_table['pwm_duty'] + output - 1, val)
  157. # 0x0220 - заполнение PWM (%)
  158. def get_pwm_duty_all_save(self):
  159. return self.read_holding_registers(reg_table['pwm_duty_save'], 8)
  160. def set_pwm_duty_save(self, output, val):
  161. self.write_holding_register(reg_table['pwm_duty_save'] + output - 1, val)
  162. # 0x0230 - период PWM (0.1 сек)
  163. def get_pwm_period_all(self):
  164. return self.read_holding_registers(reg_table['pwm_per'], 8)
  165. def set_pwm_period(self, output, val):
  166. self.write_holding_register(reg_table['pwm_per'] + output - 1, val)
  167. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  168. def get_pwm_period_all_save(self):
  169. return self.read_holding_registers(reg_table['pwm_per_save'], 8)
  170. def set_pwm_period_save(self, output, val):
  171. self.write_holding_register(reg_table['pwm_per_save'] + output - 1, val)
  172. def get_uptime(self):
  173. return self.read_uint32_holding(reg_table['uptime'])
  174. def get_rtc(self):
  175. return self.read_uint32_holding(reg_table['rtc_unix'])
  176. def set_rtc(self, utc):
  177. self.write_uint32(reg_table['rtc_sinhro'], utc)
  178. '''
  179. def get_archive_entryes(self, ent_num, ent_index):
  180. data = self.read_file_record(ARCHIVE_ENTRY, 1, 1)
  181. #print(struct.unpack('<Q', data[5:13]))
  182. entry = struct.unpack('<QBB', data[5:15])
  183. print(entry)
  184. return data
  185. '''
  186. def main():
  187. colorama.init(autoreset=True)
  188. dev = IO_Module('COM24', 115200, 15)
  189. dev.MB_DEBUG = False
  190. # dev.update('fw.bin')
  191. # Запрос системных параметров, установка времени
  192. # print('Device uptime:', dev.get_uptime())
  193. # unix_time = dev.get_rtc()
  194. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  195. # print('Set time:', int(time.time()))
  196. # dev.set_rtc(int(time.time()))
  197. # time.sleep(1)
  198. # unix_time = dev.get_rtc()
  199. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  200. dev.get_archive_entryes(1, 1)
  201. return
  202. for i in range(1, 9):
  203. dev.set_input_mode(i, 1)
  204. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  205. for i in range(1, 9):
  206. dev.set_debounce_channel(i, 100 + i)
  207. # dev.set_output(i, 1)
  208. # dev.set_output_mode(i, 1)
  209. # 0x0203 - состояние выходов (режим обычного выхода) в безопасном режиме работы
  210. # for i in range(1, 9):
  211. # dev.set_output_mode_save(i, 0)
  212. # 0x0210 - заполнение PWM (ms)
  213. # 0x0220 - заполнение PWM (%)
  214. # 0x0230 - период PWM (0.1 сек)
  215. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  216. # for i in range(1, 9):
  217. # dev.set_pwm_duty(i, 10 + i)
  218. # dev.set_pwm_duty_save(i, 20 + i)
  219. # dev.set_pwm_period(i, 10 + i)
  220. # dev.set_pwm_period_save(i, 20 + i)
  221. # -----------------------------------------------------------------------------
  222. # Тесты PWM
  223. # Установить 1-ый выход в режим PWM
  224. # for i in range(3, 9):
  225. # dev.set_output_mode(i, 0)
  226. # dev.set_output_mode(1, 1)
  227. # dev.set_pwm_period(1, 30)
  228. # dev.set_pwm_period(1, 50)
  229. # dev.set_pwm_duty(2, 10)
  230. # return
  231. # Установить нормальное состояние входов
  232. # dev.set_inputs_norm_state(0b00110101)
  233. # print(dev.get_pwm_duty_all())
  234. while True:
  235. print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
  236. # Значения входов (битовое поле)
  237. print('Inputs values [bit field] :', Fore.GREEN + dev.get_inputs_bit())
  238. # Режим работы входов (битовое поле)
  239. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  240. # Нормальное состояние входов (битовое поле)
  241. print('Inputs norm [bit field] :', Fore.GREEN + dev.get_inputs_norm_state())
  242. # Период антидребезга (ms)
  243. print('Debounce input (ms) :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_debounce_channels()))
  244. # Значение счетчиков
  245. data = dev.get_inputs_counters()
  246. print('Inputs counters :', Fore.GREEN + ' | '.join(str(el) for el in data))
  247. # Текущее состояние выходов в обычном режиме
  248. print('Outputs norm [bit field] :', Fore.GREEN + dev.get_outputs())
  249. # Состояние выходов в безопасном режиме
  250. print('Outputs save [bit field] :', Fore.GREEN + dev.get_outputs_mode_save())
  251. # Режим работы выходов
  252. print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  253. # 0x0210 - заполнение PWM (ms)
  254. print('PWM duty cycle [%] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_duty_all()))
  255. # 0x0220 - заполнение PWM в безопасном режиме (ms)
  256. print('PWM duty cycle (save) [%] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_duty_all_save()))
  257. # 0x0230 - период PWM (0.1 сек)
  258. print('PWM period [0.1 sec] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_period_all()))
  259. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  260. print('PWM period save [0.1 sec] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_period_all_save()))
  261. # Дергаем одним выходом
  262. # dev.set_output(2, trigger)
  263. # trigger = not trigger
  264. # # Для проверки выходов в обычном режиме
  265. # for i in range(1, 9):
  266. # dev.set_output(i, 1)
  267. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  268. # time.sleep(0.1)
  269. # for i in range(1, 9):
  270. # dev.set_output(i, 0)
  271. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  272. # time.sleep(0.1)
  273. # # Режим работы выходов
  274. # for i in range(1, 9):
  275. # dev.set_output_mode(i, 1)
  276. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  277. # time.sleep(0.1)
  278. # for i in range(1, 9):
  279. # dev.set_output_mode(i, 0)
  280. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  281. # time.sleep(0.1)
  282. # break
  283. time.sleep(1)
  284. # for i in range(1, 9):
  285. # dev.set_input_mode(i, 1)
  286. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  287. # for i in range(8, 0, -1):
  288. # dev.set_input_mode(i, 0)
  289. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  290. if __name__ == '__main__':
  291. main()