import time from serial import Serial from typing import Union SendData = Union[str, bytes, bytearray, int] class RIM_489: DEBUG_PRINT = True def __init__(self, tty: str, baudrate: int): self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='N', xonxoff=False, stopbits=1, bytesize=8) @staticmethod def to_bytes(data: SendData) -> bytes: if isinstance(data, str): return data.encode() elif isinstance(data, int): return bytes((data, )) else: return bytes(data) def send(self, data: SendData, add_checksum: bool = False): buf = self.to_bytes(data) if add_checksum: pass if self.DEBUG_PRINT: print(f'> {buf.hex()}') self.serial.write(buf) def recv(self, count: int, timeout: float = 1) -> bytes: self.serial.timeout = timeout buf = self.serial.read(count) if self.DEBUG_PRINT: print(f'< {buf.hex()}') # print(f'< {buf}') return buf def read(self, count: int, timeout: float = 3) -> bytes: buf = bytearray() start_time = time.monotonic() while time.monotonic() - start_time < timeout: buf.extend(self.recv(count - len(buf))) if len(buf) == count: return bytes(buf) raise print('Read timeout') def test(self): self.serial.write('this is test string\r\n'.encode()) time.sleep(0.5) while self.serial.in_waiting() > 0: out += self.serial.read(1) print(out) self.serial.close() if __name__ == '__main__': rim = RIM_489('COM13', 9600) rim.send(b'\x3f') rim.read(1) rim.send(b'\x00\x02\x80\x71') # rim.send(b'\x0f\x03\x10\x40\xff') # rim.read(12) rim.read(20)