import jsonrpclib from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer import threading import time SERVER_ADDR = ("localhost", 7000) class JSONRPCServer: def __init__(self): self.server = SimpleJSONRPCServer(SERVER_ADDR) self.service = ExampleAggregateService() self.server.register_instance(self.service, allow_dotted_names=True) self.server.register_function(self.service.sumation, 'sum') self.server.register_function(self.service.sumation, 'notify_sum') self.server.register_function(self.service.sumation, 'namespace.sum') # self.server.register_introspection_functions() # self.JSONRPC_server.register_function(lambda x, y: x + y, 'add') self.thread = threading.Thread(self.server.serve_forever) self.thread.daemon = True self.thread.stop = self.stop self.thread.start() # self._wait_for_rpc_start() # time.sleeep(3) def _wait_for_rpc_start(self): rpc = jsonrpclib.Server("http://{0}:{1}".format(SERVER_ADDR[0], SERVER_ADDR[1])) while True: try: if rpc.ping(): print("Ping JSONRPCServer successfull") return except: pass time.sleep(1) def stop(self): self.server.shutdown() class JSONRPCClient: def __init__(self) -> None: self.client = jsonrpclib.Server("http://{0}:{1}".format(SERVER_ADDR[0], SERVER_ADDR[1])) class ExampleService: @staticmethod def add(x, y): return x + y @staticmethod def sumation(*args): return sum(args) @staticmethod def get_data(): return ['hello', 5] def ping(): return True class ExampleAggregateService(ExampleService): def __init__(self) -> None: self.sub_service = ExampleService() server = JSONRPCServer() client = JSONRPCClient() print(client.ping())