123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import time
- from serial import Serial
- from typing import Union
- SendData = Union[str, bytes, bytearray, int]
- class RIM_489:
- DEBUG_PRINT = True
- def __init__(self, tty: str, baudrate: int):
- self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='N', xonxoff=False, stopbits=1, bytesize=8)
- @staticmethod
- def to_bytes(data: SendData) -> bytes:
- if isinstance(data, str):
- return data.encode()
- elif isinstance(data, int):
- return bytes((data, ))
- else:
- return bytes(data)
- def send(self, data: SendData, add_checksum: bool = False):
- buf = self.to_bytes(data)
- if add_checksum:
- pass
- if self.DEBUG_PRINT:
- print(f'> {buf.hex()}')
- self.serial.write(buf)
- def recv(self, count: int, timeout: float = 1) -> bytes:
- self.serial.timeout = timeout
- buf = self.serial.read(count)
- if self.DEBUG_PRINT:
- print(f'< {buf.hex()}')
- # print(f'< {buf}')
- return buf
- def read(self, count: int, timeout: float = 3) -> bytes:
- buf = bytearray()
- start_time = time.monotonic()
- while time.monotonic() - start_time < timeout:
- buf.extend(self.recv(count - len(buf)))
- if len(buf) == count:
- return bytes(buf)
- raise print('Read timeout')
- def test(self):
- self.serial.write('this is test string\r\n'.encode())
- time.sleep(0.5)
- while self.serial.in_waiting() > 0:
- out += self.serial.read(1)
- print(out)
- self.serial.close()
- if __name__ == '__main__':
- rim = RIM_489('COM13', 9600)
- rim.send(b'\x3f')
- rim.read(1)
- rim.send(b'\x00\x02\x80\x71')
- # rim.send(b'\x0f\x03\x10\x40\xff')
- # rim.read(12)
- rim.read(20)
|