TelenkovDmitry 1 жил өмнө
parent
commit
da98c04bf0

+ 15 - 2
algorithm/galg.py

@@ -59,7 +59,7 @@ def quick_sort(arr):
         return quick_sort(less) + [pivot] + quick_sort(greater)
     
 
-print(quick_sort([5,3,6,4,7,1,3,2]))
+# print(quick_sort([5,3,6,4,7,1,3,2]))
 
 def arr_test(arr):
     return [i for i in arr[1:]]
@@ -68,8 +68,21 @@ def arr_test(arr):
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+# Треугольник паскаля
 
+def triangle_pascale(n):
+    tr = []
+    for i in range (n + 1):
+        tr.append([1] + [0]*n)
 
-# Разное
+    for i in range(1, n + 1):
+        for j in range(1, i + 1):
+            tr[i][j] = tr[i-1][j] + tr[i-1][j-1]
 
+    for i in range(0, n + 1):
+        for j in range(0, i + 1):
+            print(tr[i][j], end=' ')
+        print()
+
+triangle_pascale(10)
 

+ 233 - 0
artery_loader.py

@@ -0,0 +1,233 @@
+import os
+import time
+from typing import Iterable, Union, Optional, Tuple
+from serial import Serial
+
+
+class ErrorAT32(IOError):
+	"""Common error for FlashAT32 util"""
+
+
+SendData = Union[str, bytes, bytearray, Iterable[int], int]
+
+
+class FlashAT32:
+	"""Artery AT32x flashing utility"""
+	CMD_SYNC = 0x7F
+	CMD_ACK = 0x79
+	CMD_NACK = 0x1F
+	ADDR_FLASH = 0x08000000
+	CHUNK_SIZE = 256
+	DEBUG_PRINT = False
+
+	# def __init__(self, tty: str, baudrate: int = 115200):
+	def __init__(self, tty: str, baudrate: int):
+		self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='E', xonxoff=False)
+
+	@staticmethod
+	def to_bytes(data: SendData) -> bytes:
+		"""Convert various types of data to bytes"""
+		if isinstance(data, str):
+			return data.encode()
+		elif isinstance(data, int):
+			return bytes((data, ))
+		else:
+			return bytes(data)
+
+	@staticmethod
+	def checksum(buf: bytes) -> int:
+		"""Calculate standard CRC-8 checksum"""
+		crc = 0 if len(buf) > 1 else 0xFF
+		for b in buf:
+			crc = crc.__xor__(b)
+		return crc
+
+	def send(self, data: SendData, add_checksum: bool = False):
+		"""Send data to device"""
+		buf = self.to_bytes(data)
+		if add_checksum:
+			buf += bytes((self.checksum(buf), ))
+		if self.DEBUG_PRINT:
+			print(f'> {buf}')
+		self.serial.write(buf)
+
+	def recv(self, count: int, timeout: float = 1) -> bytes:
+		"""Receive bytes from device"""
+		self.serial.timeout = timeout
+		buf = self.serial.read(count)
+		if self.DEBUG_PRINT:
+			print(f'< {buf}')
+		return buf
+
+	def read(self, count: int, timeout: float = 3) -> bytes:
+		"""Read count bytes from device or raise exception"""
+		buf = bytearray()
+		start_time = time.monotonic()
+		while time.monotonic() - start_time < timeout:
+			buf.extend(self.recv(count - len(buf)))
+			if len(buf) == count:
+				return bytes(buf)
+		raise ErrorAT32('Read timeout')
+
+	def receive_ack(self, timeout: float = 10) -> Optional[bool]:
+		"""Wait ACK byte from device, return True on ACN, False on NACK, None on timeout"""
+		start_time = time.monotonic()
+		while time.monotonic() - start_time < timeout:
+			buf = self.recv(1)
+			if buf == bytes((self.CMD_ACK, )):
+				return True
+			if buf == bytes((self.CMD_NACK, )):
+				return False
+		return None
+
+	def wait_ack(self, timeout: float = 10):
+		"""Wait ACK byte from device, raise exception on NACK or timeout"""
+		res = self.receive_ack(timeout)
+		if res is None:
+			raise ErrorAT32('ACK timeout')
+		if not res:
+			raise ErrorAT32('NACK received')
+
+	def send_cmd(self, code: int):
+		"""Send command and wait ACK"""
+		self.send((code, ~code & 0xFF))
+		self.wait_ack()
+
+	def set_isp(self):
+		"""Send host data to device"""
+		self.send((0xFA, 0x05))
+		ack = self.receive_ack()
+		if ack is None:
+			raise ErrorAT32('ACK timeout')
+		if not ack:
+			raise ErrorAT32('NACK received')
+			return
+		self.send((0x02, 0x03, 0x54, 0x41), add_checksum=True)
+		self.wait_ack()
+
+	def connect(self, timeout: float = 10):
+		"""Init connection with AT32 bootloader"""
+		start_time = time.monotonic()
+		while time.monotonic() - start_time < timeout:
+			self.send(self.CMD_SYNC)
+			if self.recv(1, 0.01) == bytes((self.CMD_ACK, )):
+				return self.set_isp()
+		raise ErrorAT32('Connection timeout')
+
+	def access_unprotect(self):
+		"""Disable access protection"""
+		self.send_cmd(0x92)
+		self.wait_ack()
+
+	def get_device_id(self) -> Tuple[int, int]:
+		"""Read some device info"""
+		self.send_cmd(0x02)
+		length = self.read(1)[0]
+		if length != 4:
+			raise ErrorAT32('Incorrect response length')
+		buf = self.read(length + 1)
+		project_code, device_code = buf[4], buf[1] | (buf[0] << 8) | (buf[3] << 16) | (buf[2] << 24)
+		if project_code not in {8, }:
+			raise ErrorAT32('Device not supported')
+		return project_code, device_code
+
+	def reset(self):
+		"""SW reboot for device"""
+		self.send_cmd(0xD4)
+		self.wait_ack()
+
+	def read_mem(self, address: int, count: int) -> bytes:
+		"""Read data from device memory"""
+		self.send_cmd(0x11)
+		self.send(address.to_bytes(4, 'big'), add_checksum=True)
+		self.wait_ack()
+		self.send((count - 1).to_bytes(1, 'big'), add_checksum=True)
+		self.wait_ack()
+		return self.read(count)
+
+	def write_mem(self, address: int, buf: bytes):
+		"""Write data to device memory"""
+		self.send_cmd(0x31)
+		self.send(address.to_bytes(4, 'big'), add_checksum=True)
+		self.wait_ack()
+		self.send(bytes((len(buf) - 1, )) + buf, add_checksum=True)
+		self.wait_ack()
+
+	def get_flash_size(self) -> int:
+		"""Read flash size"""
+		return int.from_bytes(self.read_mem(0x1FFFF7E0, 2), 'little') & 0x0000FFFF
+
+	def get_uid(self) -> bytes:
+		"""Read unique chip ID"""
+		return self.read_mem(0x1FFFF7E8, 12)
+		# buf = bytearray()
+		# x = 3
+		# for i in range(12 // x):
+		#	 buf.extend(self.read_mem(0x1FFFF7E8 + i * x, 1 * x))
+		# return bytes(buf)
+
+	def get_uid_str(self) -> str:
+		"""Get unique chip ID as hex string"""
+		return ''.join(f'{b:02X}' for b in self.get_uid())
+
+	def read_flash(self, address: int, size: int) -> bytes:
+		"""Read all content of flash memory"""
+		chunks, last_bytes_count = divmod(size, self.CHUNK_SIZE)
+		res = bytearray()
+		for i in range(chunks):
+			res.extend(self.read_mem(address + i * self.CHUNK_SIZE, self.CHUNK_SIZE))
+		if last_bytes_count != 0:
+			res.extend(self.read_mem(address + chunks * self.CHUNK_SIZE, last_bytes_count))
+		return bytes(res)
+
+	def read_flash_to_file(self, address: int, size: int, path: str):
+		"""Read all content of flash memory to file"""
+		with open(path, 'wb') as f:
+			f.write(self.read_flash(address, size))	
+
+	def erase_flash(self):
+		"""Do mass erase"""
+		self.send_cmd(0x44)
+		self.send((0xFF, 0xFF), add_checksum=True)
+		self.wait_ack()
+
+	def write_flash(self, address: int, buf: bytes):
+		"""Write binary data to flash"""
+		flash_full_chunks, last_chunk_size = divmod(len(buf), self.CHUNK_SIZE)
+		for i in range(flash_full_chunks):
+			self.write_mem(address + i * self.CHUNK_SIZE, buf[i * self.CHUNK_SIZE:(i + 1) * self.CHUNK_SIZE])
+		if last_chunk_size > 0:
+			self.write_mem(address + flash_full_chunks * self.CHUNK_SIZE,
+						   buf[flash_full_chunks * self.CHUNK_SIZE:])
+
+	def write_file_to_flash(self, address: int, path: str):
+		"""write binary file to flash"""
+		with open(path, 'rb') as f:
+			self.write_flash(address, f.read())
+
+
+if __name__ == '__main__':
+	start_time = time.time()
+	d = FlashAT32('/dev/ttyUSB0', 115200 * 8)
+	d.DEBUG_PRINT = True
+	try:
+		d.connect()
+	except Exception as e:
+		print(e)
+	print(d.get_flash_size())
+	print(d.get_uid_str())
+	d.erase_flash()
+	iap_path = '../output/iap.bin'
+	fw_path = '../output/fw.bin'
+	iap_path_r = '../output/m3_artery_iap.bin'
+	fw_path_r = '../output/m3_artery_fw.bin'
+	d.write_file_to_flash(0x08000000, iap_path)
+	d.write_file_to_flash(0x08021000, fw_path)
+	d.read_flash_to_file(0x08000000, os.path.getsize(iap_path), iap_path_r)
+	d.read_flash_to_file(0x08021000, os.path.getsize(fw_path), fw_path_r)
+	os.system(f'diff {iap_path} {iap_path_r}')
+	os.system(f'diff {fw_path} {fw_path_r}')
+	os.remove(iap_path_r)
+	os.remove(fw_path_r)
+
+	print(time.time() - start_time)

+ 146 - 0
courses/python_for_begginers/test.py

@@ -0,0 +1,146 @@
+# a, b, c = map(int, input().split())
+# print(a, b, c, sep=', ')
+
+# n = int(input())
+#n = 5000 # 121
+# n = 5000%8640
+# h = n//3600
+# m_d = n%3600//600
+# m = (n - h*3600 - m_d*600)//60
+# sec = n - h*3600 - m_d*600 - m*60
+# print(h, ":", m_d, m, ":", sec, sep="")
+# print(n//3600, ":", n%3600//600, n%3600//60, ":", n%3600%60//10, n%3600%60, sep="")
+
+# a, b, c = map(int, input().split())
+# # a = int(input())
+# print(a**2 + b**2 == c**2 or a**2 + c**2 == b**2 or b**2 + c**2 == a**2)
+
+# s = '1234567'
+# print(s[::-1])
+
+# s = input()
+# print(s[:3].upper() + s[3:len(s)-3] + s[len(s)-3:].upper())
+
+# print(input().title().swapcase())
+
+# my_list = [1] * 77
+# print(len(my_list))
+
+# my_list = list(input().split())
+# print('-'.join(my_list[0]).upper(), '-'.join(my_list[1]).upper())
+
+# my_list = list(input().split())
+# print(my_list[2], my_list[0][0].upper() + '.', my_list[0][1].upper() + '.')
+
+# my_list = list(input().split())
+# print('\n'.join(my_list))
+
+
+# s = input()
+# if s == 'Python':
+#     print('ДА')
+# else:
+#     print('НЕТ')
+
+
+# a, b, c = map(int, input().split())
+# print(a)
+
+
+# if (s := int(input())) > 10000:
+#     print(f'Ого! {s}! Куда вам столько? Мы заберем {s - 10000}')        
+# else:
+#     print(f'Сумма {s} не превышает лимит, проходите')
+
+
+# if 'walrus' in (my_list := input()):
+#     print('Нашли моржа')
+# else:
+#     print('Никаких моржей тут нет')
+
+
+# s = list(input().strip())
+# t = list(input().strip())
+# s.reverse()
+# if t == s:
+#     print('YES')
+# else:
+#     print('NO')
+
+# s = input()
+# new_s = s[-1::-1]
+# if s == new_s:
+#     print('YES')
+# else:
+#     print('NO')
+
+# n = input().zfill(6)
+# left_list = list(map(int, n[:3].strip()))
+# right_list = list(map(int, n[-1:-4:-1].strip()))
+# if sum(left_list) == sum(right_list):
+#     print('YES')
+# else:
+#     print('NO')
+
+
+# x1, y1 = map(ord, input())
+# x2, y2 = map(ord, input())
+# dx = x2 - x1
+# dy = y2 - y1
+# print(x1, y1)
+# print(x2, y2)
+# if dx%2 == 0 and dy%2 == 0:
+#     print('YES')
+# if dx%2 != 0 and dy%2 != 0:
+#     print('YES')    
+# else:
+#     print('NO')
+
+# n, m = map(int, input().split())
+
+# l1 = list(map(int, input().split()))
+# l2 = list(map(int, input().split()))
+# l3 = list()
+
+# i = 0
+# l = len(l1) + len(l2)
+
+# while i < l:
+#     if len(l1) != 0 and len(l2) != 0:
+#         if l1[0] <= l2[0]:
+#             l3.append(l1.pop(0))
+#         else:
+#             l3.append(l2.pop(0))
+#     elif len(l1) != 0 and len(l2) == 0:
+#         l3.append(l1.pop(0))
+#     else:
+#         l3.append(l2.pop(0))
+
+#     i += 1
+
+# print(*l3)
+
+
+# b_len = int(input())
+# b = list(map(int, input().split()))
+# g_len = int(input())
+# g = list(map(int, input().split()))
+
+# b = [3, 3, 5, 5]
+# g = [4, 4, 2, 2]
+
+# i = 0
+# p = 0
+# while i < len(b) and len(g):
+#     if b[i] in g:
+#         p += 1
+#         g.remove(b[i]) 
+#     elif b[i] - 1 in g:
+#         p += 1
+#         g.remove(b[i] - 1) 
+#     elif b[i] + 1 in g:
+#         p += 1
+#         g.remove(b[i] + 1) 
+#     i += 1
+
+# print(p)

+ 0 - 112
metrolog/metrolog_m3.py

@@ -1,112 +0,0 @@
-import subprocess
-import os
-import socket
-from datetime import datetime
-
-class AnnounceM3():
-    """Container and parser for M2 UDP announce data"""
-    # '{model};{serial};{mac};{sw_ver};{button_set};{button_mode};{stm32_id};{uptime};{production_date};{test_state};{button_box};'
-
-    def __init__(self, model, serial, mac, sw_ver, button_set, button_mod,
-                stm32_id, uptime, prod_date, test_state, button_box):
-        self.model = model 
-        self.serial = serial
-        self.mac = mac
-        self.sw_ver = sw_ver
-        self.button_set = button_set
-        self.button_mod = button_mod
-        self.stm32_id = stm32_id
-        self.uptime = uptime
-        self.prod_date = prod_date
-        self.test_state = test_state
-        self.button_box = button_box
-
-    @classmethod
-    def parse(cls, raw):
-        """Parse raw bytes from received datagram"""
-        if not isinstance(raw, str):
-            try:
-                raw = raw.decode()
-            except Exception as e:
-                return
-        sp = raw.split(';')
-        return cls(sp[0], sp[1], sp[2], sp[3], sp[4], sp[5], sp[6], sp[7], sp[8], sp[9], sp[10])
-
-    def print_members(self):
-        print(f'Модель: {self.model}')
-        print(f'Серийный номер: {self.serial}')
-        print(f'MAC: {self.mac}')
-        print(f'Версия сновной прошивки: {self.sw_ver}')
-        print(f'Кнопка "УСТАНОВКА": {self.button_set}')
-        print(f'Кнопка "РЕЖИМ": {self.button_mod}')
-        print(f'Тампер: {self.button_box}')
-        print(f'STM32_ID: {self.stm32_id}')
-        print(f'Время работы (uptime): {self.uptime}')
-        print(f'Дата производства: {self.prod_date}')
-        print(f'Статус тестирвоания: {self.test_state}')
-        
-
-
-class Metrolog_M3():
-    """Metrolog-M3 board service client"""
-    ANNOUNCE_TIMEOUT = 5
-    ANNOUNCE_PORT = 49049
-
-    def __init__(self, ip):
-        self.ip = ip
-
-    def ping(self):
-        """Test connection with device using ping"""
-        null = open(os.devnull)
-        try:
-            subprocess.check_call(['ping', '-n', '1',  self.ip],
-                                   timeout=10, stdout=null, stderr=null)
-            print("Ping - OK")
-            return True
-        except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
-            return False
-
-    @staticmethod
-    def announce_socket():
-        """Create socket to receive announce messages"""
-        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        sock.settimeout(.5)
-        sock.bind(('', Metrolog_M3.ANNOUNCE_PORT))
-        return sock
-
-    def wait_announce(self, timeout=None, get_ip=False):
-        """Wait next UDP announce from device"""
-        if timeout is None:
-            self.timeout = self.ANNOUNCE_TIMEOUT
-        sock = self.announce_socket()
-        start = datetime.now()
-        while True:
-            try:
-                raw, address = sock.recvfrom(4096)
-            except socket.timeout:
-                pass
-            else:
-                announce = AnnounceM3.parse(raw)
-                if announce is not None:
-                    if address[0] == self.ip:
-                        print("Контроллер найден")
-                        return announce
-            finally:
-                if (datetime.now() - start).seconds > self.timeout:
-                    print("Превышено время ожидания запроса")
-                    return None
-            
-
-
-# b'\xd0\x9c\xd0\xb5\xd1\x82\xd1\x80\xd0\xbe\xd0\xbb\xd0\xbe\xd0\xb3 M3.4.3;7030000;B6-C1-EF-9C-FB-00;1.016t;false;false;5A0037001451313136393333;1221;00.00.00;;false;'
-
-metrolog = Metrolog_M3("192.168.25.6")
-# print(metrolog.wait_announce())
-ann = metrolog.wait_announce()
-ann.print_members()
-
-
-# announce = AnnounceM3()
-# metrolog.ping()
-# print(datetime.now())

+ 69 - 0
rim.py

@@ -0,0 +1,69 @@
+import time
+from serial import Serial
+from typing import Union
+
+SendData = Union[str, bytes, bytearray, int]
+
+class RIM_489:
+    DEBUG_PRINT = True
+
+    def __init__(self, tty: str, baudrate: int):
+        self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='N', xonxoff=False, stopbits=1, bytesize=8)
+
+    @staticmethod
+    def to_bytes(data: SendData) -> bytes:
+        if isinstance(data, str):
+            return data.encode()
+        elif isinstance(data, int):
+            return bytes((data, ))
+        else:
+            return bytes(data)
+
+    def send(self, data: SendData, add_checksum: bool = False):
+        buf = self.to_bytes(data)
+        if add_checksum:
+            pass
+        if self.DEBUG_PRINT:
+            print(f'> {buf.hex()}')
+        self.serial.write(buf)
+
+    def recv(self, count: int, timeout: float = 1) -> bytes:
+        self.serial.timeout = timeout
+        buf = self.serial.read(count)
+        if self.DEBUG_PRINT:
+            print(f'< {buf.hex()}')
+            # print(f'< {buf}')
+        return buf
+
+    def read(self, count: int, timeout: float = 3) -> bytes:
+        buf = bytearray()
+        start_time = time.monotonic()
+        while time.monotonic() - start_time < timeout:
+            buf.extend(self.recv(count - len(buf)))
+            if len(buf) == count:
+                return bytes(buf)
+        raise print('Read timeout')
+
+    def test(self):
+        self.serial.write('this is test string\r\n'.encode())
+        time.sleep(0.5)
+        while self.serial.in_waiting() > 0:
+            out += self.serial.read(1)
+        print(out)
+        self.serial.close()
+
+
+if __name__ == '__main__':    
+    rim = RIM_489('COM13', 9600)
+    rim.send(b'\x3f')
+    rim.read(1)
+
+
+    rim.send(b'\x00\x02\x80\x71')
+    # rim.send(b'\x0f\x03\x10\x40\xff')
+    # rim.read(12)
+    rim.read(20)
+
+
+
+