transport.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import socket
  2. class PortgwTcp():
  3. def __init__(self, ip, port):
  4. self.ip = ip
  5. self.port = port
  6. def open(self):
  7. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  8. try:
  9. self.sock.connect((self.ip, self.port))
  10. except socket.error:
  11. print(f'Connection to {self.ip} failed. Exit')
  12. return False
  13. return True
  14. def close(self):
  15. self.sock.close()
  16. def read(self, nbytes, timeout=1):
  17. data = ''
  18. self.sock.settimeout(timeout)
  19. while len(data) < nbytes:
  20. try:
  21. ret = self.sock.recv(nbytes)
  22. except socket.error:
  23. print("Read socker error!")
  24. break
  25. data += str(ret)
  26. return data
  27. def write(self, data):
  28. send = self.sock.send(data)
  29. if send == 0:
  30. raise RuntimeError('Socker write failed')
  31. return send
  32. class PortgwBase():
  33. def __init__(self, transport):
  34. self.open = transport.open
  35. self.close = transport.close
  36. self.write = transport.write
  37. self.read = transport.read
  38. class PortgwBaseTcp(PortgwBase):
  39. def __init__(self, ip, port):
  40. transport = PortgwTcp(ip, port)
  41. super(PortgwBaseTcp, self).__init__(transport)
  42. def pgw_test():
  43. pgw_1 = PortgwTcp('192.168.31.230', 1002)
  44. pgw_1.open()
  45. test_data = b"test data string"
  46. print('Socker writed data:', pgw_1.write(test_data))
  47. pgw_1.read(len(test_data))
  48. pgw_1.close()
  49. pgw_2 = PortgwTcp('192.168.31.230', 1003)
  50. pgw_2.open()
  51. test_data = b"test data string"
  52. print('Socker writed data:', pgw_2.write(test_data))
  53. pgw_2.read(len(test_data))
  54. pgw_2.close()
  55. if __name__ == "__main__":
  56. pgw_test()