artery_loader.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import os
  2. import time
  3. from typing import Iterable, Union, Optional, Tuple
  4. from serial import Serial
  5. import shutil
  6. import subprocess
  7. import re
  8. import inquirer
  9. class ErrorAT32(IOError):
  10. """Common error for FlashAT32 util"""
  11. SendData = Union[str, bytes, bytearray, Iterable[int], int]
  12. class FlashAT32:
  13. """Artery AT32x flashing utility"""
  14. CMD_SYNC = 0x7F
  15. CMD_ACK = 0x79
  16. CMD_NACK = 0x1F
  17. ADDR_FLASH = 0x08000000
  18. CHUNK_SIZE = 256
  19. DEBUG_PRINT = False
  20. # def __init__(self, tty: str, baudrate: int = 115200):
  21. def __init__(self, tty: str, baudrate: int):
  22. self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='E', xonxoff=False)
  23. @staticmethod
  24. def to_bytes(data: SendData) -> bytes:
  25. """Convert various types of data to bytes"""
  26. if isinstance(data, str):
  27. return data.encode()
  28. elif isinstance(data, int):
  29. return bytes((data, ))
  30. else:
  31. return bytes(data)
  32. @staticmethod
  33. def checksum(buf: bytes) -> int:
  34. """Calculate standard CRC-8 checksum"""
  35. crc = 0 if len(buf) > 1 else 0xFF
  36. for b in buf:
  37. crc = crc.__xor__(b)
  38. return crc
  39. def send(self, data: SendData, add_checksum: bool = False):
  40. """Send data to device"""
  41. buf = self.to_bytes(data)
  42. if add_checksum:
  43. buf += bytes((self.checksum(buf), ))
  44. if self.DEBUG_PRINT:
  45. print(f'> {buf}')
  46. self.serial.write(buf)
  47. def recv(self, count: int, timeout: float = 1) -> bytes:
  48. """Receive bytes from device"""
  49. self.serial.timeout = timeout
  50. buf = self.serial.read(count)
  51. if self.DEBUG_PRINT:
  52. print(f'< {buf}')
  53. return buf
  54. def read(self, count: int, timeout: float = 3) -> bytes:
  55. """Read count bytes from device or raise exception"""
  56. buf = bytearray()
  57. start_time = time.monotonic()
  58. while time.monotonic() - start_time < timeout:
  59. buf.extend(self.recv(count - len(buf)))
  60. if len(buf) == count:
  61. return bytes(buf)
  62. raise ErrorAT32('Read timeout')
  63. def receive_ack(self, timeout: float = 10) -> Optional[bool]:
  64. """Wait ACK byte from device, return True on ACN, False on NACK, None on timeout"""
  65. start_time = time.monotonic()
  66. while time.monotonic() - start_time < timeout:
  67. buf = self.recv(1)
  68. if buf == bytes((self.CMD_ACK, )):
  69. return True
  70. if buf == bytes((self.CMD_NACK, )):
  71. return False
  72. return None
  73. def wait_ack(self, timeout: float = 10):
  74. """Wait ACK byte from device, raise exception on NACK or timeout"""
  75. res = self.receive_ack(timeout)
  76. if res is None:
  77. raise ErrorAT32('ACK timeout')
  78. if not res:
  79. raise ErrorAT32('NACK received')
  80. def send_cmd(self, code: int):
  81. """Send command and wait ACK"""
  82. self.send((code, ~code & 0xFF))
  83. self.wait_ack()
  84. def set_isp(self):
  85. """Send host data to device"""
  86. self.send((0xFA, 0x05))
  87. ack = self.receive_ack()
  88. if ack is None:
  89. raise ErrorAT32('ACK timeout')
  90. if not ack:
  91. raise ErrorAT32('NACK received')
  92. return
  93. self.send((0x02, 0x03, 0x54, 0x41), add_checksum=True)
  94. self.wait_ack()
  95. def connect(self, timeout: float = 10):
  96. """Init connection with AT32 bootloader"""
  97. start_time = time.monotonic()
  98. while time.monotonic() - start_time < timeout:
  99. self.send(self.CMD_SYNC)
  100. if self.recv(1, 0.01) == bytes((self.CMD_ACK, )):
  101. return self.set_isp()
  102. raise ErrorAT32('Connection timeout')
  103. def access_unprotect(self):
  104. """Disable access protection"""
  105. self.send_cmd(0x92)
  106. self.wait_ack()
  107. def get_device_id(self) -> Tuple[int, int]:
  108. """Read some device info"""
  109. self.send_cmd(0x02)
  110. length = self.read(1)[0]
  111. if length != 4:
  112. raise ErrorAT32('Incorrect response length')
  113. buf = self.read(length + 1)
  114. project_code, device_code = buf[4], buf[1] | (buf[0] << 8) | (buf[3] << 16) | (buf[2] << 24)
  115. if project_code not in {8, }:
  116. raise ErrorAT32('Device not supported')
  117. return project_code, device_code
  118. def reset(self):
  119. """SW reboot for device"""
  120. self.send_cmd(0xD4)
  121. self.wait_ack()
  122. def read_mem(self, address: int, count: int) -> bytes:
  123. """Read data from device memory"""
  124. self.send_cmd(0x11)
  125. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  126. self.wait_ack()
  127. self.send((count - 1).to_bytes(1, 'big'), add_checksum=True)
  128. self.wait_ack()
  129. return self.read(count)
  130. def write_mem(self, address: int, buf: bytes):
  131. """Write data to device memory"""
  132. self.send_cmd(0x31)
  133. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  134. self.wait_ack()
  135. self.send(bytes((len(buf) - 1, )) + buf, add_checksum=True)
  136. self.wait_ack()
  137. def get_flash_size(self) -> int:
  138. """Read flash size"""
  139. return int.from_bytes(self.read_mem(0x1FFFF7E0, 2), 'little') & 0x0000FFFF
  140. def get_uid(self) -> bytes:
  141. """Read unique chip ID"""
  142. return self.read_mem(0x1FFFF7E8, 12)
  143. # buf = bytearray()
  144. # x = 3
  145. # for i in range(12 // x):
  146. # buf.extend(self.read_mem(0x1FFFF7E8 + i * x, 1 * x))
  147. # return bytes(buf)
  148. def get_uid_str(self) -> str:
  149. """Get unique chip ID as hex string"""
  150. return ''.join(f'{b:02X}' for b in self.get_uid())
  151. def read_flash(self, address: int, size: int) -> bytes:
  152. """Read all content of flash memory"""
  153. chunks, last_bytes_count = divmod(size, self.CHUNK_SIZE)
  154. res = bytearray()
  155. for i in range(chunks):
  156. res.extend(self.read_mem(address + i * self.CHUNK_SIZE, self.CHUNK_SIZE))
  157. if last_bytes_count != 0:
  158. res.extend(self.read_mem(address + chunks * self.CHUNK_SIZE, last_bytes_count))
  159. return bytes(res)
  160. def read_flash_to_file(self, address: int, size: int, path: str):
  161. """Read all content of flash memory to file"""
  162. with open(path, 'wb') as f:
  163. f.write(self.read_flash(address, size))
  164. def erase_flash(self):
  165. """Do mass erase"""
  166. self.send_cmd(0x44)
  167. self.send((0xFF, 0xFF), add_checksum=True)
  168. self.wait_ack()
  169. def write_flash(self, address: int, buf: bytes):
  170. """Write binary data to flash"""
  171. flash_full_chunks, last_chunk_size = divmod(len(buf), self.CHUNK_SIZE)
  172. for i in range(flash_full_chunks):
  173. self.write_mem(address + i * self.CHUNK_SIZE, buf[i * self.CHUNK_SIZE:(i + 1) * self.CHUNK_SIZE])
  174. if last_chunk_size > 0:
  175. self.write_mem(address + flash_full_chunks * self.CHUNK_SIZE,
  176. buf[flash_full_chunks * self.CHUNK_SIZE:])
  177. def write_file_to_flash(self, address: int, path: str):
  178. """write binary file to flash"""
  179. with open(path, 'rb') as f:
  180. self.write_flash(address, f.read())
  181. def menu_test():
  182. questions = [
  183. inquirer.List('size',
  184. message="What size do you need?",
  185. choices=['Jumbo', 'Large', 'Standard', 'Medium', 'Small', 'Micro'],
  186. ),
  187. ]
  188. answers = inquirer.prompt(questions)
  189. print(answers)
  190. if __name__ == '__main__':
  191. # Копируем бинарники и добавляем CRC в fw.bin
  192. # print("Копирование bin файлов")
  193. # shutil.copy('../../output/iap.bin', 'bin/iap.bin')
  194. # shutil.copy('../../output/fw.bin', 'bin/fw.bin')
  195. # shutil.copy('../../output/crc_ewarm.exe', 'bin/crc_ewarm.exe')
  196. # print('Добавление CRC в fw.bin')
  197. # os.startfile('bin\crc_ewarm.exe')
  198. """
  199. start_time = time.time()
  200. d = FlashAT32('COM53', 115200 * 8)
  201. d.DEBUG_PRINT = True
  202. try:
  203. d.connect()
  204. except Exception as e:
  205. print(e)
  206. print(d.get_flash_size())
  207. print(d.get_uid_str())
  208. d.erase_flash()
  209. iap_path = 'bin/iap.bin'
  210. fw_path = 'bin/fw.bin'
  211. iap_path_r = 'bin/m3_artery_iap.bin'
  212. fw_path_r = 'bin/m3_artery_fw.bin'
  213. d.write_file_to_flash(0x08000000, iap_path)
  214. d.write_file_to_flash(0x08021000, fw_path)
  215. d.read_flash_to_file(0x08000000, os.path.getsize(iap_path), iap_path_r)
  216. d.read_flash_to_file(0x08021000, os.path.getsize(fw_path), fw_path_r)
  217. # os.system(f'diff {iap_path} {iap_path_r}')
  218. # os.system(f'diff {fw_path} {fw_path_r}')
  219. os.remove(iap_path_r)
  220. os.remove(fw_path_r)
  221. print(time.time() - start_time)
  222. """