import jsonrpclib from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer import threading import time SERVER_ADDR = ("localhost", 7000) class JSONRPCServer: def __init__(self): self.server = SimpleJSONRPCServer(SERVER_ADDR) self.service = ExampleAggregateService() self.server.register_instance(self.service, allow_dotted_names=True) self.server.register_function(self.service.sumation, 'sum') self.server.register_function(self.service.sumation, 'notify_sum') self.server.register_function(self.service.sumation, 'namespace.sum') self.thread = threading.Thread(target=self.server.serve_forever) self.thread.start() def _wait_for_rpc_start(self): rpc = jsonrpclib.Server("http://{0}:{1}".format(SERVER_ADDR[0], SERVER_ADDR[1])) while True: try: if rpc.ping(): print("Ping JSONRPCServer successfull") return except: pass time.sleep(1) def stop(self): self.server.shutdown() class JSONRPCClient: def __init__(self): # self.client = jsonrpclib.Server("http://{0}:{1}".format(SERVER_ADDR[0], SERVER_ADDR[1])) self.client = jsonrpclib.Server("http://{0}:{1}".format("192.168.31.226", 888)) # self.thread = threading.Thread(target=self.test) # self.thread.start() def test(self): # while True: # # print(self.client.get_device_info()) # # self.client.get_changes() # self.client.get_changes() # print(jsonrpclib.history.request) # print(jsonrpclib.history.response) # time.sleep(3) self.client.get_changes() # self.client.get_opc_config() print(jsonrpclib.history.request) print(jsonrpclib.history.response) class ExampleService: @staticmethod def add(x, y): return x + y @staticmethod def sumation(*args): return sum(args) @staticmethod def get_data(): return ['hello', 5] def ping(): return True class ExampleAggregateService(ExampleService): def __init__(self): self.sub_service = ExampleService() # server = JSONRPCServer() client = JSONRPCClient() client.test() # print(client.ping())