1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- import socket
- class PortgwTcp():
- def __init__(self, ip, port):
- self.ip = ip
- self.port = port
- def open(self):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- self.sock.connect((self.ip, self.port))
- except socket.error:
- print(f'Connection to {self.ip} failed. Exit')
- return False
- return True
-
- def close(self):
- self.sock.close()
- def read(self, nbytes, timeout=1):
- data = ''
- self.sock.settimeout(timeout)
- while len(data) < nbytes:
- try:
- ret = self.sock.recv(nbytes)
- except socket.error:
- print("Read socker error!")
- break
- data += str(ret)
- return data
- def write(self, data):
- send = self.sock.send(data)
- if send == 0:
- raise RuntimeError('Socker write failed')
- return send
- class PortgwBase():
- def __init__(self, transport):
- self.open = transport.open
- self.close = transport.close
- self.write = transport.write
- self.read = transport.read
- class PortgwBaseTcp(PortgwBase):
- def __init__(self, ip, port):
- transport = PortgwTcp(ip, port)
- super(PortgwBaseTcp, self).__init__(transport)
- def pgw_test():
- pgw_1 = PortgwTcp('192.168.31.230', 1002)
- pgw_1.open()
- test_data = b"test data string"
- print('Socker writed data:', pgw_1.write(test_data))
- pgw_1.read(len(test_data))
- pgw_1.close()
- pgw_2 = PortgwTcp('192.168.31.230', 1003)
- pgw_2.open()
- test_data = b"test data string"
- print('Socker writed data:', pgw_2.write(test_data))
- pgw_2.read(len(test_data))
- pgw_2.close()
- if __name__ == "__main__":
- pgw_test()
|