artery_loader.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import os
  2. import time
  3. from typing import Iterable, Union, Optional, Tuple
  4. from serial import Serial
  5. import shutil
  6. import subprocess
  7. import re
  8. import inquirer
  9. from tqdm import tqdm
  10. class ErrorAT32(IOError):
  11. """Common error for FlashAT32 util"""
  12. SendData = Union[str, bytes, bytearray, Iterable[int], int]
  13. class FlashAT32:
  14. """Artery AT32x flashing utility"""
  15. CMD_SYNC = 0x7F
  16. CMD_ACK = 0x79
  17. CMD_NACK = 0x1F
  18. ADDR_FLASH = 0x08000000
  19. CHUNK_SIZE = 256
  20. DEBUG_PRINT = False
  21. # def __init__(self, tty: str, baudrate: int = 115200):
  22. def __init__(self, tty: str, baudrate: int):
  23. self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='E', xonxoff=False)
  24. @staticmethod
  25. def to_bytes(data: SendData) -> bytes:
  26. """Convert various types of data to bytes"""
  27. if isinstance(data, str):
  28. return data.encode()
  29. elif isinstance(data, int):
  30. return bytes((data, ))
  31. else:
  32. return bytes(data)
  33. @staticmethod
  34. def checksum(buf: bytes) -> int:
  35. """Calculate standard CRC-8 checksum"""
  36. crc = 0 if len(buf) > 1 else 0xFF
  37. for b in buf:
  38. crc = crc.__xor__(b)
  39. return crc
  40. def send(self, data: SendData, add_checksum: bool = False):
  41. """Send data to device"""
  42. buf = self.to_bytes(data)
  43. if add_checksum:
  44. buf += bytes((self.checksum(buf), ))
  45. if self.DEBUG_PRINT:
  46. print(f'> {buf}')
  47. self.serial.write(buf)
  48. def recv(self, count: int, timeout: float = 1) -> bytes:
  49. """Receive bytes from device"""
  50. self.serial.timeout = timeout
  51. buf = self.serial.read(count)
  52. if self.DEBUG_PRINT:
  53. print(f'< {buf}')
  54. return buf
  55. def read(self, count: int, timeout: float = 3) -> bytes:
  56. """Read count bytes from device or raise exception"""
  57. buf = bytearray()
  58. start_time = time.monotonic()
  59. while time.monotonic() - start_time < timeout:
  60. buf.extend(self.recv(count - len(buf)))
  61. if len(buf) == count:
  62. return bytes(buf)
  63. raise ErrorAT32('Read timeout')
  64. def receive_ack(self, timeout: float = 10) -> Optional[bool]:
  65. """Wait ACK byte from device, return True on ACN, False on NACK, None on timeout"""
  66. start_time = time.monotonic()
  67. while time.monotonic() - start_time < timeout:
  68. buf = self.recv(1)
  69. if buf == bytes((self.CMD_ACK, )):
  70. return True
  71. if buf == bytes((self.CMD_NACK, )):
  72. return False
  73. return None
  74. def wait_ack(self, timeout: float = 10):
  75. """Wait ACK byte from device, raise exception on NACK or timeout"""
  76. res = self.receive_ack(timeout)
  77. if res is None:
  78. raise ErrorAT32('ACK timeout')
  79. if not res:
  80. raise ErrorAT32('NACK received')
  81. def send_cmd(self, code: int):
  82. """Send command and wait ACK"""
  83. self.send((code, ~code & 0xFF))
  84. self.wait_ack()
  85. def set_isp(self):
  86. """Send host data to device"""
  87. self.send((0xFA, 0x05))
  88. ack = self.receive_ack()
  89. if ack is None:
  90. raise ErrorAT32('ACK timeout')
  91. if not ack:
  92. raise ErrorAT32('NACK received')
  93. return
  94. self.send((0x02, 0x03, 0x54, 0x41), add_checksum=True)
  95. self.wait_ack()
  96. def connect(self, timeout: float = 10):
  97. """Init connection with AT32 bootloader"""
  98. start_time = time.monotonic()
  99. while time.monotonic() - start_time < timeout:
  100. self.send(self.CMD_SYNC)
  101. if self.recv(1, 0.01) == bytes((self.CMD_ACK, )):
  102. return self.set_isp()
  103. raise ErrorAT32('Connection timeout')
  104. def access_unprotect(self):
  105. """Disable access protection"""
  106. self.send_cmd(0x92)
  107. self.wait_ack()
  108. def get_device_id(self) -> Tuple[int, int]:
  109. """Read some device info"""
  110. self.send_cmd(0x02)
  111. length = self.read(1)[0]
  112. if length != 4:
  113. raise ErrorAT32('Incorrect response length')
  114. buf = self.read(length + 1)
  115. project_code, device_code = buf[4], buf[1] | (buf[0] << 8) | (buf[3] << 16) | (buf[2] << 24)
  116. if project_code not in {8, }:
  117. raise ErrorAT32('Device not supported')
  118. return project_code, device_code
  119. def reset(self):
  120. """SW reboot for device"""
  121. self.send_cmd(0xD4)
  122. self.wait_ack()
  123. def read_mem(self, address: int, count: int) -> bytes:
  124. """Read data from device memory"""
  125. self.send_cmd(0x11)
  126. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  127. self.wait_ack()
  128. self.send((count - 1).to_bytes(1, 'big'), add_checksum=True)
  129. self.wait_ack()
  130. return self.read(count)
  131. def write_mem(self, address: int, buf: bytes):
  132. """Write data to device memory"""
  133. self.send_cmd(0x31)
  134. self.send(address.to_bytes(4, 'big'), add_checksum=True)
  135. self.wait_ack()
  136. self.send(bytes((len(buf) - 1, )) + buf, add_checksum=True)
  137. self.wait_ack()
  138. def get_flash_size(self) -> int:
  139. """Read flash size"""
  140. return int.from_bytes(self.read_mem(0x1FFFF7E0, 2), 'little') & 0x0000FFFF
  141. def get_uid(self) -> bytes:
  142. """Read unique chip ID"""
  143. return self.read_mem(0x1FFFF7E8, 12)
  144. # buf = bytearray()
  145. # x = 3
  146. # for i in range(12 // x):
  147. # buf.extend(self.read_mem(0x1FFFF7E8 + i * x, 1 * x))
  148. # return bytes(buf)
  149. def get_uid_str(self) -> str:
  150. """Get unique chip ID as hex string"""
  151. return ''.join(f'{b:02X}' for b in self.get_uid())
  152. def read_flash(self, address: int, size: int) -> bytes:
  153. """Read all content of flash memory"""
  154. chunks, last_bytes_count = divmod(size, self.CHUNK_SIZE)
  155. res = bytearray()
  156. for i in range(chunks):
  157. res.extend(self.read_mem(address + i * self.CHUNK_SIZE, self.CHUNK_SIZE))
  158. if last_bytes_count != 0:
  159. res.extend(self.read_mem(address + chunks * self.CHUNK_SIZE, last_bytes_count))
  160. return bytes(res)
  161. def read_flash_to_file(self, address: int, size: int, path: str):
  162. """Read all content of flash memory to file"""
  163. with open(path, 'wb') as f:
  164. f.write(self.read_flash(address, size))
  165. def erase_flash(self):
  166. """Do mass erase"""
  167. self.send_cmd(0x44)
  168. self.send((0xFF, 0xFF), add_checksum=True)
  169. self.wait_ack()
  170. def write_flash(self, address: int, buf: bytes):
  171. """Write binary data to flash"""
  172. flash_full_chunks, last_chunk_size = divmod(len(buf), self.CHUNK_SIZE)
  173. for i in tqdm(range(flash_full_chunks)):
  174. self.write_mem(address + i * self.CHUNK_SIZE, buf[i * self.CHUNK_SIZE:(i + 1) * self.CHUNK_SIZE])
  175. if last_chunk_size > 0:
  176. self.write_mem(address + flash_full_chunks * self.CHUNK_SIZE,
  177. buf[flash_full_chunks * self.CHUNK_SIZE:])
  178. def write_file_to_flash(self, address: int, path: str):
  179. """write binary file to flash"""
  180. with open(path, 'rb') as f:
  181. self.write_flash(address, f.read())
  182. def menu_test():
  183. questions = [
  184. inquirer.List('size',
  185. message="What size do you need?",
  186. choices=['Jumbo', 'Large', 'Standard', 'Medium', 'Small', 'Micro'],
  187. ),
  188. ]
  189. answers = inquirer.prompt(questions)
  190. print(answers)
  191. if __name__ == '__main__':
  192. # Копируем бинарники и добавляем CRC в fw.bin
  193. # print("Копирование bin файлов")
  194. # shutil.copy('../../output/iap.bin', 'bin/iap.bin')
  195. # shutil.copy('../../output/fw.bin', 'bin/fw.bin')
  196. # shutil.copy('../../output/crc_ewarm.exe', 'bin/crc_ewarm.exe')
  197. # print('Добавление CRC в fw.bin')
  198. # os.startfile('bin\crc_ewarm.exe')
  199. """
  200. start_time = time.time()
  201. d = FlashAT32('COM53', 115200 * 8)
  202. d.DEBUG_PRINT = True
  203. try:
  204. d.connect()
  205. except Exception as e:
  206. print(e)
  207. print(d.get_flash_size())
  208. print(d.get_uid_str())
  209. d.erase_flash()
  210. iap_path = 'bin/iap.bin'
  211. fw_path = 'bin/fw.bin'
  212. iap_path_r = 'bin/m3_artery_iap.bin'
  213. fw_path_r = 'bin/m3_artery_fw.bin'
  214. d.write_file_to_flash(0x08000000, iap_path)
  215. d.write_file_to_flash(0x08021000, fw_path)
  216. d.read_flash_to_file(0x08000000, os.path.getsize(iap_path), iap_path_r)
  217. d.read_flash_to_file(0x08021000, os.path.getsize(fw_path), fw_path_r)
  218. # os.system(f'diff {iap_path} {iap_path_r}')
  219. # os.system(f'diff {fw_path} {fw_path_r}')
  220. os.remove(iap_path_r)
  221. os.remove(fw_path_r)
  222. print(time.time() - start_time)
  223. """