artery_loader.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import os
  2. import time
  3. from typing import Iterable, Union, Optional, Tuple
  4. from serial import Serial
  5. import shutil
  6. import subprocess
  7. class ErrorAT32(IOError):
  8. """Common error for FlashAT32 util"""
  9. SendData = Union[str, bytes, bytearray, Iterable[int], int]
  10. class FlashAT32:
  11. """Artery AT32x flashing utility"""
  12. CMD_SYNC = 0x7F
  13. CMD_ACK = 0x79
  14. CMD_NACK = 0x1F
  15. ADDR_FLASH = 0x08000000
  16. CHUNK_SIZE = 256
  17. DEBUG_PRINT = False
  18. # def __init__(self, tty: str, baudrate: int = 115200):
  19. def __init__(self, tty: str, baudrate: int):
  20. self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='E', xonxoff=False)
  21. @staticmethod
  22. def to_bytes(data: SendData) -> bytes:
  23. """Convert various types of data to bytes"""
  24. if isinstance(data, str):
  25. return data.encode()
  26. elif isinstance(data, int):
  27. return bytes((data, ))
  28. else:
  29. return bytes(data)
  30. @staticmethod
  31. def checksum(buf: bytes) -> int:
  32. """Calculate standard CRC-8 checksum"""
  33. crc = 0 if len(buf) > 1 else 0xFF
  34. for b in buf:
  35. crc = crc.__xor__(b)
  36. return crc
  37. def send(self, data: SendData, add_checksum: bool = False):
  38. """Send data to device"""
  39. buf = self.to_bytes(data)
  40. if add_checksum:
  41. buf += bytes((self.checksum(buf), ))
  42. if self.DEBUG_PRINT:
  43. print(f'> {buf}')
  44. self.serial.write(buf)
  45. def recv(self, count: int, timeout: float = 1) -> bytes:
  46. """Receive bytes from device"""
  47. self.serial.timeout = timeout
  48. buf = self.serial.read(count)
  49. if self.DEBUG_PRINT:
  50. print(f'< {buf}')
  51. return buf
  52. def read(self, count: int, timeout: float = 3) -> bytes:
  53. """Read count bytes from device or raise exception"""
  54. buf = bytearray()
  55. start_time = time.monotonic()
  56. while time.monotonic() - start_time < timeout:
  57. buf.extend(self.recv(count - len(buf)))
  58. if len(buf) == count:
  59. return bytes(buf)
  60. raise ErrorAT32('Read timeout')
  61. def receive_ack(self, timeout: float = 10) -> Optional[bool]:
  62. """Wait ACK byte from device, return True on ACN, False on NACK, None on timeout"""
  63. start_time = time.monotonic()
  64. while time.monotonic() - start_time < timeout:
  65. buf = self.recv(1)
  66. if buf == bytes((self.CMD_ACK, )):
  67. return True
  68. if buf == bytes((self.CMD_NACK, )):
  69. return False
  70. return None
  71. def wait_ack(self, timeout: float = 10):
  72. """Wait ACK byte from device, raise exception on NACK or timeout"""
  73. res = self.receive_ack(timeout)
  74. if res is None:
  75. raise ErrorAT32('ACK timeout')
  76. if not res:
  77. raise ErrorAT32('NACK received')
  78. def send_cmd(self, code: int):
  79. """Send command and wait ACK"""
  80. self.send((code, ~code & 0xFF))
  81. self.wait_ack()
  82. def set_isp(self):
  83. """Send host data to device"""
  84. self.send((0xFA, 0x05))
  85. ack = self.receive_ack()
  86. if ack is None:
  87. raise ErrorAT32('ACK timeout')
  88. if not ack:
  89. raise ErrorAT32('NACK received')
  90. return
  91. self.send((0x02, 0x03, 0x54, 0x41), add_checksum=True)
  92. self.wait_ack()
  93. def connect(self, timeout: float = 10):
  94. """Init connection with AT32 bootloader"""
  95. start_time = time.monotonic()
  96. while time.monotonic() - start_time < timeout:
  97. self.send(self.CMD_SYNC)
  98. if self.recv(1, 0.01) == bytes((self.CMD_ACK, )):
  99. return self.set_isp()
  100. raise ErrorAT32('Connection timeout')
  101. def access_unprotect(self):
  102. """Disable access protection"""
  103. self.send_cmd(0x92)
  104. self.wait_ack()
  105. def get_device_id(self) -> Tuple[int, int]:
  106. """Read some device info"""
  107. self.send_cmd(0x02)
  108. length = self.read(1)[0]
  109. if length != 4:
  110. raise ErrorAT32('Incorrect response length')
  111. buf = self.read(length + 1)
  112. project_code, device_code = buf[4], buf[1] | (buf[0] << 8) | (buf[3] << 16) | (buf[2] << 24)
  113. if project_code not in {8, }:
  114. raise ErrorAT32('Device not supported')
  115. return project_code, device_code
  116. def reset(self):
  117. """SW reboot for device"""
  118. self.send_cmd(0xD4)
  119. self.wait_ack()
  120. def read_mem(self, address: int, count: int) -> bytes:
  121. """Read data from device memory"""
  122. self.send_cmd(0x11)
  123. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  124. self.wait_ack()
  125. self.send((count - 1).to_bytes(1, 'big'), add_checksum=True)
  126. self.wait_ack()
  127. return self.read(count)
  128. def write_mem(self, address: int, buf: bytes):
  129. """Write data to device memory"""
  130. self.send_cmd(0x31)
  131. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  132. self.wait_ack()
  133. self.send(bytes((len(buf) - 1, )) + buf, add_checksum=True)
  134. self.wait_ack()
  135. def get_flash_size(self) -> int:
  136. """Read flash size"""
  137. return int.from_bytes(self.read_mem(0x1FFFF7E0, 2), 'little') & 0x0000FFFF
  138. def get_uid(self) -> bytes:
  139. """Read unique chip ID"""
  140. return self.read_mem(0x1FFFF7E8, 12)
  141. # buf = bytearray()
  142. # x = 3
  143. # for i in range(12 // x):
  144. # buf.extend(self.read_mem(0x1FFFF7E8 + i * x, 1 * x))
  145. # return bytes(buf)
  146. def get_uid_str(self) -> str:
  147. """Get unique chip ID as hex string"""
  148. return ''.join(f'{b:02X}' for b in self.get_uid())
  149. def read_flash(self, address: int, size: int) -> bytes:
  150. """Read all content of flash memory"""
  151. chunks, last_bytes_count = divmod(size, self.CHUNK_SIZE)
  152. res = bytearray()
  153. for i in range(chunks):
  154. res.extend(self.read_mem(address + i * self.CHUNK_SIZE, self.CHUNK_SIZE))
  155. if last_bytes_count != 0:
  156. res.extend(self.read_mem(address + chunks * self.CHUNK_SIZE, last_bytes_count))
  157. return bytes(res)
  158. def read_flash_to_file(self, address: int, size: int, path: str):
  159. """Read all content of flash memory to file"""
  160. with open(path, 'wb') as f:
  161. f.write(self.read_flash(address, size))
  162. def erase_flash(self):
  163. """Do mass erase"""
  164. self.send_cmd(0x44)
  165. self.send((0xFF, 0xFF), add_checksum=True)
  166. self.wait_ack()
  167. def write_flash(self, address: int, buf: bytes):
  168. """Write binary data to flash"""
  169. flash_full_chunks, last_chunk_size = divmod(len(buf), self.CHUNK_SIZE)
  170. for i in range(flash_full_chunks):
  171. self.write_mem(address + i * self.CHUNK_SIZE, buf[i * self.CHUNK_SIZE:(i + 1) * self.CHUNK_SIZE])
  172. if last_chunk_size > 0:
  173. self.write_mem(address + flash_full_chunks * self.CHUNK_SIZE,
  174. buf[flash_full_chunks * self.CHUNK_SIZE:])
  175. def write_file_to_flash(self, address: int, path: str):
  176. """write binary file to flash"""
  177. with open(path, 'rb') as f:
  178. self.write_flash(address, f.read())
  179. if __name__ == '__main__':
  180. # Копируем бинарники и добавляем CRC в fw.bin
  181. # print("Копирование bin файлов")
  182. # shutil.copy('../../output/iap.bin', 'bin/iap.bin')
  183. # shutil.copy('../../output/fw.bin', 'bin/fw.bin')
  184. # shutil.copy('../../output/crc_ewarm.exe', 'bin/crc_ewarm.exe')
  185. # print('Добавление CRC в fw.bin')
  186. # os.startfile('bin\crc_ewarm.exe')
  187. start_time = time.time()
  188. d = FlashAT32('COM3', 115200 * 1)
  189. d.DEBUG_PRINT = True
  190. try:
  191. d.connect()
  192. except Exception as e:
  193. print(e)
  194. print(d.get_flash_size())
  195. print(d.get_uid_str())
  196. """
  197. d.erase_flash()
  198. iap_path = '../output/iap.bin'
  199. fw_path = '../output/fw.bin'
  200. iap_path_r = '../output/m3_artery_iap.bin'
  201. fw_path_r = '../output/m3_artery_fw.bin'
  202. d.write_file_to_flash(0x08000000, iap_path)
  203. d.write_file_to_flash(0x08021000, fw_path)
  204. d.read_flash_to_file(0x08000000, os.path.getsize(iap_path), iap_path_r)
  205. d.read_flash_to_file(0x08021000, os.path.getsize(fw_path), fw_path_r)
  206. os.system(f'diff {iap_path} {iap_path_r}')
  207. os.system(f'diff {fw_path} {fw_path_r}')
  208. os.remove(iap_path_r)
  209. os.remove(fw_path_r)
  210. print(time.time() - start_time)
  211. """