rim.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import time
  2. from serial import Serial
  3. from typing import Union
  4. SendData = Union[str, bytes, bytearray, int]
  5. class RIM_489:
  6. DEBUG_PRINT = True
  7. def __init__(self, tty: str, baudrate: int):
  8. self.serial = Serial(port=tty, baudrate=baudrate, timeout=1, parity='N', xonxoff=False, stopbits=1, bytesize=8)
  9. @staticmethod
  10. def to_bytes(data: SendData) -> bytes:
  11. if isinstance(data, str):
  12. return data.encode()
  13. elif isinstance(data, int):
  14. return bytes((data, ))
  15. else:
  16. return bytes(data)
  17. def send(self, data: SendData, add_checksum: bool = False):
  18. buf = self.to_bytes(data)
  19. if add_checksum:
  20. pass
  21. if self.DEBUG_PRINT:
  22. print(f'> {buf.hex()}')
  23. self.serial.write(buf)
  24. def recv(self, count: int, timeout: float = 1) -> bytes:
  25. self.serial.timeout = timeout
  26. buf = self.serial.read(count)
  27. if self.DEBUG_PRINT:
  28. print(f'< {buf.hex()}')
  29. # print(f'< {buf}')
  30. return buf
  31. def read(self, count: int, timeout: float = 3) -> bytes:
  32. buf = bytearray()
  33. start_time = time.monotonic()
  34. while time.monotonic() - start_time < timeout:
  35. buf.extend(self.recv(count - len(buf)))
  36. if len(buf) == count:
  37. return bytes(buf)
  38. raise print('Read timeout')
  39. def test(self):
  40. self.serial.write('this is test string\r\n'.encode())
  41. time.sleep(0.5)
  42. while self.serial.in_waiting() > 0:
  43. out += self.serial.read(1)
  44. print(out)
  45. self.serial.close()
  46. if __name__ == '__main__':
  47. rim = RIM_489('COM13', 9600)
  48. rim.send(b'\x3f')
  49. rim.read(1)
  50. rim.send(b'\x00\x02\x80\x71')
  51. # rim.send(b'\x0f\x03\x10\x40\xff')
  52. # rim.read(12)
  53. rim.read(20)