io_module.py 16 KB


  1. from modbus import Modbus, MBError, NoResponseError
  2. import colorama
  3. from colorama import Fore
  4. import time
  5. import os
  6. import struct
  7. import random
  8. models = {0x100: 'MDIO_88', 0x200: 'MAO_8', 0x300: 'MAI_12'}
  9. MODEL_MDIO_88 = 0x0100
  10. MODEL_MAO_8 = 0x0200
  11. MODEL_MAI_12 = 0x0300
  12. SYS_TEST_OK = 0x0001
  13. reg_table = {'in_bits': 0x0100, 'in_cnt': 0x0102, 'in_mode': 0x0120, 'in_norm': 0x0122, 'in_deb_start': 0x124,
  14. 'out_cur': 0x0200, 'out_mode': 0x0202, 'out_mode_save': 0x0203, 'pwm_duty': 0x0210,
  15. 'pwm_duty_save': 0x0220, 'pwm_per': 0x0230, 'pwm_per_save': 0x0240,
  16. 'rtc_unix': 0x0802, 'rtc_sinhro': 0x0804, 'uptime': 0x0800, 'log_info': 0x0900,
  17. 'log_ent': 0x0901, 'arch_cap': 0x0902, 'arch_ent': 0x0903,
  18. 'model': 0x0080, 'prod_time': 0x0081, 'serial': 0x0083, 'fw_ver': 0x0085,
  19. 'test_status': 0x0089, 'password': 0x008A}
  20. class IO_Module(Modbus):
  21. def __init__(self, tty: str, brate: int, address: int):
  22. super().__init__(tty, brate, address)
  23. self.update_segment_number = 0
  24. def iap_start(self):
  25. """Reboot device in IAP mode"""
  26. request = bytes((self.address, 0x41, 0x01))
  27. response = self.raw_communicate(request + self._crc(request))
  28. def write_fw_start(self, size: int):
  29. """Ask device to start update"""
  30. self.update_segment_number = 0
  31. request = bytes((self.address, 0x41, 0x01, 0xEF, 0xBE, 0xAD, 0xDE)) + size.to_bytes(4, 'big')
  32. response = self.raw_communicate(request + self._crc(request), 5)
  33. if len(response) == 0:
  34. raise NoResponseError('No response on WRITE_START command')
  35. if len(response) != 5:
  36. raise MBError('Incorrect response length')
  37. if response[:3] != bytes((self.address, 0x41, 0x01)):
  38. raise MBError('Incorrect response')
  39. def write_fw_part(self, data: bytes):
  40. """Write piece of FW data in IAP mode"""
  41. header = bytes((self.address, 0x41, 0x02))
  42. request = b''.join((header, self.update_segment_number.to_bytes(2, 'big'), data))
  43. response = self.raw_communicate(request + self._crc(request), 5)
  44. # self.print_hex(response)
  45. if len(response) != 5:
  46. raise MBError('Incorrect response length')
  47. if (response[:3]) != header:
  48. raise MBError('Incorrect response')
  49. self.update_segment_number += 1
  50. def iap_finish(self):
  51. """Complete FW transmission and check response"""
  52. header = request = bytes((self.address, 0x41, 0x03))
  53. response = self.raw_communicate(request + self._crc(request), 5)
  54. if len(response) != 5:
  55. raise MBError('Incorrect response length')
  56. if response[:3] != header:
  57. raise MBError('Incorrect response')
  58. def update(self, path):
  59. self.MB_TIMEOUT = 3
  60. size = os.path.getsize('fw.bin')
  61. print('Switch to IAP mode')
  62. self.iap_start()
  63. time.sleep(4)
  64. print(f'Start writing {size} bytes of FW')
  65. self.write_fw_start(size)
  66. time.sleep(2)
  67. print(f'Open FW file "{path}"...')
  68. with open(path, 'rb') as f:
  69. done = progress_cur = progress_pre = 0
  70. while True:
  71. buf = f.read(128)
  72. if len(buf):
  73. self.write_fw_part(buf)
  74. progress_cur = done / size
  75. else:
  76. break
  77. print('End of transmission')
  78. self.iap_finish()
  79. # 0x0100 - текущее состояние входов
  80. def get_inputs_bit(self) -> str:
  81. data = self.read_holding_registers(reg_table['in_bits'], 1)
  82. return format(data[0], '08b')
  83. # 0x0101 - 0x0110 Счетчики импульсов
  84. def get_inputs_counters(self):
  85. data = []
  86. for i in range(reg_table['in_cnt'], reg_table['in_cnt'] + 16, 2):
  87. data.append(self.read_uint32_holding(i))
  88. return data
  89. # 0x0120 - режим работы входов
  90. def get_inputs_mode(self):
  91. data = self.read_holding_registers(reg_table['in_mode'], 1)
  92. return format(data[0], '08b')
  93. def set_inputs_mode(self, val):
  94. self.write_holding_register(reg_table['in_mode'], val)
  95. #
  96. def set_input_mode(self, input, val):
  97. ret = self.read_holding_registers(reg_table['in_mode'], 1)
  98. if val == 1:
  99. data = ret[0] | (0b1 << (input - 1))
  100. else:
  101. data = ret[0] & ~(0b1 << (input - 1))
  102. self.set_inputs_mode(data)
  103. # 0x0122 - нормальное состояние входов
  104. def get_inputs_norm_state(self):
  105. data = self.read_holding_registers(reg_table['in_norm'], 1)
  106. return format(data[0], '08b')
  107. def set_inputs_norm_state(self, val):
  108. self.write_holding_register(reg_table['in_norm'], val)
  109. # 0x0124 - время антидребезга (ms)
  110. def get_debounce_channel(self, input):
  111. data = self.read_holding_registers(reg_table['in_deb_start'] + input - 1, 1)
  112. return data[0]
  113. def get_debounce_channels(self):
  114. return self.read_holding_registers(reg_table['in_deb_start'], 8)
  115. def set_debounce_channel(self, input, val):
  116. self.write_holding_register(reg_table['in_deb_start'] + input - 1, val)
  117. # 0x0200 - текущее состояние выходов в обычно режиме
  118. def get_outputs(self):
  119. data = self.read_holding_registers(reg_table['out_cur'], 1)
  120. return format(data[0], '08b')
  121. # 0x0200 - текущее состояние выходов в обычно режиме
  122. def set_outputs(self, val):
  123. self.write_holding_register(reg_table['out_cur'], val)
  124. def set_output(self, output, val):
  125. ret = self.read_holding_registers(reg_table['out_cur'], 1)
  126. if val == 1:
  127. data = ret[0] | (0b1 << (output - 1))
  128. else:
  129. data = ret[0] & ~(0b1 << (output - 1))
  130. self.set_outputs(data)
  131. # 0x0202 - режим работы выходов; 0 - обычный, 1 - ШИМ
  132. def get_outputs_mode(self):
  133. data = self.read_holding_registers(reg_table['out_mode'], 1)
  134. return format(data[0], '08b')
  135. def set_outputs_mode(self, val):
  136. self.write_holding_register(reg_table['out_mode'], val)
  137. def set_output_mode(self, output, val):
  138. ret = self.read_holding_registers(reg_table['out_mode'], 1)
  139. if val == 1:
  140. data = ret[0] | (0b1 << (output - 1))
  141. else:
  142. data = ret[0] & ~(0b1 << (output - 1))
  143. self.set_outputs_mode(data)
  144. # 0x0203 - состояние выходов (режим обычного выхода) в безопасном режиме работы
  145. def get_outputs_mode_save(self):
  146. data = self.read_holding_registers(reg_table['out_mode_save'], 1)
  147. return format(data[0], '08b')
  148. def set_outputs_mode_save(self, val):
  149. self.write_holding_register(reg_table['out_mode_save'], val)
  150. def set_output_mode_save(self, output, val):
  151. ret = self.read_holding_registers(reg_table['out_mode_save'], 1)
  152. if val == 1:
  153. data = ret[0] | (0b1 << (output - 1))
  154. else:
  155. data = ret[0] & ~(0b1 << (output - 1))
  156. self.set_outputs_mode_save(data)
  157. # 0x0210 - заполнение PWM (%)
  158. def get_pwm_duty(self, output):
  159. data = self.read_holding_registers(reg_table['pwm_duty'] + output - 1, 1)
  160. return data[0]
  161. def get_pwm_duty_all(self):
  162. return self.read_holding_registers(reg_table['pwm_duty'], 8)
  163. def set_pwm_duty(self, output, val):
  164. self.write_holding_register(reg_table['pwm_duty'] + output - 1, val)
  165. # 0x0220 - заполнение PWM (%)
  166. def get_pwm_duty_all_save(self):
  167. return self.read_holding_registers(reg_table['pwm_duty_save'], 8)
  168. def set_pwm_duty_save(self, output, val):
  169. self.write_holding_register(reg_table['pwm_duty_save'] + output - 1, val)
  170. # 0x0230 - период PWM (0.1 сек)
  171. def get_pwm_period_all(self):
  172. return self.read_holding_registers(reg_table['pwm_per'], 8)
  173. def set_pwm_period(self, output, val):
  174. self.write_holding_register(reg_table['pwm_per'] + output - 1, val)
  175. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  176. def get_pwm_period_all_save(self):
  177. return self.read_holding_registers(reg_table['pwm_per_save'], 8)
  178. def set_pwm_period_save(self, output, val):
  179. self.write_holding_register(reg_table['pwm_per_save'] + output - 1, val)
  180. def get_uptime(self):
  181. return self.read_uint32_holding(reg_table['uptime'])
  182. def get_rtc(self):
  183. return self.read_uint32_holding(reg_table['rtc_unix'])
  184. def set_rtc(self, utc):
  185. self.write_uint32(reg_table['rtc_sinhro'], utc)
  186. def set_system_vars(dev: IO_Module, password: int):
  187. # Отправка пароля для разблокирования доступа к системным настройкам
  188. dev.write_holding_register(reg_table['password'], password)
  189. time.sleep(0.1)
  190. # Модель
  191. dev.write_holding_register(reg_table['model'], SYS_MODEL_MDIO_88)
  192. # Дата производства
  193. dev.write_holding_register(reg_table['prod_time'], int(time.time()) - 3600*24)
  194. # Серийный номер
  195. dev.write_holding_register(reg_table['serial'], random.randint(10000, 1000000))
  196. # Статус тестирования
  197. dev.write_holding_register(reg_table['test_status'], SYS_TEST_OK)
  198. def get_system_vars(dev: IO_Module):
  199. # Модель
  200. model = dev.read_holding_registers(reg_table['model'], 1)[0]
  201. print(type(model))
  202. print(models.get(int(model)))
  203. print(models[model])
  204. def main():
  205. colorama.init(autoreset=True)
  206. dev = IO_Module('COM24', 115200, 15)
  207. dev.MB_DEBUG = False
  208. # dev.update('fw.bin')
  209. # Запрос системных параметров, установка времени
  210. # print('Device uptime:', dev.get_uptime())
  211. # unix_time = dev.get_rtc()
  212. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  213. # print('Set time:', int(time.time()))
  214. # dev.set_rtc(int(time.time()))
  215. # time.sleep(1)
  216. # unix_time = dev.get_rtc()
  217. # print(f'RTC: {time.ctime(unix_time)}. Unix time stamp: {unix_time}')
  218. get_system_vars(dev)
  219. return
  220. for i in range(1, 9):
  221. dev.set_input_mode(i, 1)
  222. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  223. for i in range(1, 9):
  224. dev.set_debounce_channel(i, 100 + i)
  225. # dev.set_output(i, 1)
  226. # dev.set_output_mode(i, 1)
  227. # 0x0203 - состояние выходов (режим обычного выхода) в безопасном режиме работы
  228. # for i in range(1, 9):
  229. # dev.set_output_mode_save(i, 0)
  230. # 0x0210 - заполнение PWM (ms)
  231. # 0x0220 - заполнение PWM (%)
  232. # 0x0230 - период PWM (0.1 сек)
  233. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  234. # for i in range(1, 9):
  235. # dev.set_pwm_duty(i, 10 + i)
  236. # dev.set_pwm_duty_save(i, 20 + i)
  237. # dev.set_pwm_period(i, 10 + i)
  238. # dev.set_pwm_period_save(i, 20 + i)
  239. # -----------------------------------------------------------------------------
  240. # Тесты PWM
  241. # Установить 1-ый выход в режим PWM
  242. # for i in range(3, 9):
  243. # dev.set_output_mode(i, 0)
  244. # dev.set_output_mode(1, 1)
  245. # dev.set_pwm_period(1, 30)
  246. # dev.set_pwm_period(1, 50)
  247. # dev.set_pwm_duty(2, 10)
  248. # return
  249. # Установить нормальное состояние входов
  250. # dev.set_inputs_norm_state(0b00110101)
  251. # print(dev.get_pwm_duty_all())
  252. while True:
  253. print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
  254. # Значения входов (битовое поле)
  255. print('Inputs values [bit field] :', Fore.GREEN + dev.get_inputs_bit())
  256. # Режим работы входов (битовое поле)
  257. print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  258. # Нормальное состояние входов (битовое поле)
  259. print('Inputs norm [bit field] :', Fore.GREEN + dev.get_inputs_norm_state())
  260. # Период антидребезга (ms)
  261. print('Debounce input (ms) :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_debounce_channels()))
  262. # Значение счетчиков
  263. data = dev.get_inputs_counters()
  264. print('Inputs counters :', Fore.GREEN + ' | '.join(str(el) for el in data))
  265. # Текущее состояние выходов в обычном режиме
  266. print('Outputs norm [bit field] :', Fore.GREEN + dev.get_outputs())
  267. # Состояние выходов в безопасном режиме
  268. print('Outputs save [bit field] :', Fore.GREEN + dev.get_outputs_mode_save())
  269. # Режим работы выходов
  270. print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  271. # 0x0210 - заполнение PWM (ms)
  272. print('PWM duty cycle [%] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_duty_all()))
  273. # 0x0220 - заполнение PWM в безопасном режиме (ms)
  274. print('PWM duty cycle (save) [%] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_duty_all_save()))
  275. # 0x0230 - период PWM (0.1 сек)
  276. print('PWM period [0.1 sec] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_period_all()))
  277. # 0x0240 - период PWM в безопасном режиме (0.1 сек)
  278. print('PWM period save [0.1 sec] :', Fore.GREEN + ' | '.join(str(el) for el in dev.get_pwm_period_all_save()))
  279. # Дергаем одним выходом
  280. # dev.set_output(2, trigger)
  281. # trigger = not trigger
  282. # # Для проверки выходов в обычном режиме
  283. # for i in range(1, 9):
  284. # dev.set_output(i, 1)
  285. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  286. # time.sleep(0.1)
  287. # for i in range(1, 9):
  288. # dev.set_output(i, 0)
  289. # print('Outputs norm [bit field] :', Fore.GREEN + dev.get_output())
  290. # time.sleep(0.1)
  291. # # Режим работы выходов
  292. # for i in range(1, 9):
  293. # dev.set_output_mode(i, 1)
  294. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  295. # time.sleep(0.1)
  296. # for i in range(1, 9):
  297. # dev.set_output_mode(i, 0)
  298. # print('Outputs mode [bit field] :', Fore.GREEN + dev.get_outputs_mode())
  299. # time.sleep(0.1)
  300. # break
  301. time.sleep(1)
  302. # for i in range(1, 9):
  303. # dev.set_input_mode(i, 1)
  304. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  305. # for i in range(8, 0, -1):
  306. # dev.set_input_mode(i, 0)
  307. # print('Inputs mode [bit field] :', Fore.GREEN + dev.get_inputs_mode())
  308. if __name__ == '__main__':
  309. main()