企业宣传,产品推广,广告招商,广告投放联系seowdb

直接上代码! 自己手写RPC如何实现同步 单向调用 异步

很多好用的RPC框架都支持服务消费者以同步、异步和单向调用的方式与服务提供者进行交互,冰河你开发的这个RPC框架也可以吗?

在前面的章节中,实现了服务消费者屏蔽掉基于Netty连接服务提供者的实现细节的前提下,以异步转同步的方式调用服务提供者。在外部服务调用服务消费者向服务提供者发送数据的方法时,能够直接获取到服务提供者调用真实方法返回的结果数据。

那RPC框架只支持同步调用的话,在高并发环境下肯定会出现性能问题,我想让RPC框架支持同步、异步和单向调用,这也是很多优秀的RPC框架都支持的功能,这个有办法实现吗?

我:安排。。。

在服务提供者一端实现了按照自定义网络传输协议和数据编解码对接收到的数据进行解析,并且能够将解析到的数据作为参数调用真实方法,并接收真实方法返回的结果数据,通过自定义网络协议和数据编解码,将数据编码成二进制字节流,传输给服务消费者。

在服务消费者一端实现了按照自定义的网络传输协议和数据编解码,将数据编码成二进制字节流发送给服务提供者,能够接收到服务提供者响应回来的二进制字节流数据,并且能够根据自定义网络传输协议和数据编解码,将接收到的二进制字节流数据解码成对应的明文数据,接下来,进行进一步处理。

同时,服务消费者支持在屏蔽掉基于Netty连接服务提供者的实现细节的前提下,使得外部服务调用服务消费者向服务提供者发送数据的方法时,能够直接获取到服务提供者调用真实方法返回的结果数据。

做到这里,已经初步实现了RPC框架最基本的功能。这还远远不够,服务消费者除了能够以同步的方式调用服务提供者,也要支持异步调用和单向调用,看看人家Dubbo,做的是真特么牛逼。

好了,不羡慕人家,我们自己踏踏实实手撸吧,接下来,我们就实现服务消费者以同步、异步、单向调用的方式与服务提供者进行交互。

服务消费者与服务提供者之间基于同步、异步和单向调用的设计图分别如下图所示

通过上图可以看出:

(1)同步调用的方式,服务消费者发起数据请求后,会同步等待返回结果。

(2)异步调用的方式,服务消费者发起数据请求后,会立刻返回,后续会通过异步的方式获取数据。

(3)单向调用的方式,服务消费者发起数据请求后,会立刻返回,不必关注后续数据的处理结果。

可以看到,从设计上还是比较简单的,接下来,我们就一起实现它。

服务消费者与服务提供者之间基于同步、异步和单向调用的实现类关系如下图所示。

可以看到,核心类之间的实现关系还是比较清晰的。

RpcContext类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.context.RpcContext,源码如下所示。

public class RpcContext {private RpcContext(){}/*** RpcContext实例*/private static final RpcContext AGENT = new RpcContext();/*** 存放RPCFuture的InheritableThreadLocal*/private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();/*** 获取上下文* @return RPC服务的上下文信息*/public static RpcContext getContext(){return AGENT;}/*** 将RPCFuture保存到线程的上下文* @param rpcFuture*/public void setRPCFuture(RPCFuture rpcFuture){RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);}/*** 获取RPCFuture*/public RPCFuture getRPCFuture(){return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();}/*** 移除RPCFuture*/public void removeRPCFuture(){RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();}}

可以看到,在RpcContext类中主要是通过InheritableThreadLocal在维护RPCFuture,并且每个线程维护RPCFuture时,都是相互隔离的。RpcContext类中维护的RPCFuture会在RPC框架全局有效。

RpcConsumerHandler类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.handler.RpcConsumerHandler,具体修改步骤如下所示。

(1)新增sendRequestSync()方法

sendRequestSync()方法表示同步调用的方法,源码如下所示。

private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {RPCFuture rpcFuture = this.getRpcFuture(protocol);channel.writeAndFlush(protocol);return rpcFuture;}

可以看到,在sendRequestSync()方法中,调用channel的writeAndFlush()方法发送数据后,会返回RPCFuture对象。

(2)新增sendRequestAsync()方法

sendRequestAsync()方法表示异步调用的方法,源码如下所示。

private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {RPCFuture rpcFuture = this.getRpcFuture(protocol);//如果是异步调用,则将RPCFuture放入RpcContextRpcContext.getContext().setRPCFuture(rpcFuture);channel.writeAndFlush(protocol);return null;}

可以看到,sendRequestAsync()方法中,会将RPCFuture对象放入RpcContext上下文中,最终返回null。外部服务调用服务消费者向服务提供者发送数据后,会通过RpcContext获取到RPCFuture对象,进而通过RPCFuture对象获取最终结果数据。

(3)新增sendRequestOneway()方法

sendRequestOneway()方法表示单向调用的方法,源码如下所示。

private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {channel.writeAndFlush(protocol);return null;}

可以看到,单向调用方法并不关心返回结果。sendRequestOneway()方法直接调用channel的writeAndFlush()方法,并返回null。

(4)修改sendRequest()方法

在sendRequest()方法的参数中新增是否是异步调用的async参数和是否是单向调用的oneway参数,以这些参数来判断是执行同步调用、异步调用还是单向调用,源码如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){logger.info("服务消费者发送的数据===>>>{}", JSONObject.toJSONString(protocol));return oneway ? this.sendRequestOneway(protocol) : async ?sendRequestAsync(protocol) : this.sendRequestSync(protocol);}

RpcConsumer类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.RpcConsumer,主要是修改RpcConsumer类中的sendRequest()方法,调用RpcConsumerHandler处理器类的sendRequest()方法时,需要传递是否是异步调用async的标识和是否是单向调用oneway的标识,源码如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {//################省略其他代码################RpcRequest request = protocol.getBody();return handler.sendRequest(protocol, request.getAsync(), request.getOneway());}

至此,整个实现就完毕了。实现起来是不是很简单呢?

整个测试过程不需要修改服务提供者的代码,所以,先启动服务提供者,启动bhrpc-test-provider工程下的io.binghe.rpc.test.provider.single.RpcSingleServerTest,输出的结果信息如下所示。

INFO BaseServer:82 - Server started on 127.0.0.1:27880

可以看到,服务提供者启动成功。

(1)修改同步调用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());LOGGER.info("从服务消费者获取到的数据===>>>" + future.get());consumer.close();}

可以看到,同步调用时,会直接回去方法调用的结果数据。

(2)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输出的结果信息如下所示。

13:45:12,576INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:45:12,693INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:45:12,868INFO RpcConsumerHandler:77 - 服务消费者接收到的数据===>>>{"body":{"async":false,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}13:45:12,869INFO RpcConsumerHandlerTest:38 - 从服务消费者获取到的数据===>>>hello binghe

可以看到,在服务消费者输出的信息中,除了向服务提供者发送的数据与接收服务提供者响应的数据外,还在RpcConsumerHandlerTest类的main()方法中打印出了通过自定义的RPCFuture对象获取的最终结果数据为hello binghe。符合预期的效果。

(3)再次查看服务提供者日志

再次查看服务提供者输出的日志信息,如下所示。

13:45:12,748INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:45:12,748INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者使用CGLib的方式调用了真实的方法。

(1)修改同步调用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();consumer.sendRequest(getRpcRequestProtocol());RPCFuture future = RpcContext.getContext().getRPCFuture();LOGGER.info("从服务消费者获取到的数据===>>>" + future.get());consumer.close();}

可以看到,执行异步调用时,并没有从调用consumer的sendRequest()方法直接获取返回的RPCFuture结果数据,而是通过RpcContext上下文获取到RPCFuture对象,再由RPCFuture对象获取结果数据。

(2)修改构建RpcProtocol对象的方法

修改getRpcRequestProtocol()方法中构建RpcRequest的方法参数,将是否是异步调用的参数设置为true,源码如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){//模拟发送数据RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();//################省略其他代码##########################request.setAsync(true);request.setOneway(false);protocol.setBody(request);return protocol;}

(3)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输出的结果信息如下所示

13:47:55,800INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:47:55,905INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":true,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:47:55,971INFO RpcConsumerHandler:77 - 服务消费者接收到的数据===>>>{"body":{"async":true,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}13:47:55,971INFO RpcConsumerHandlerTest:40 - 从服务消费者获取到的数据===>>>hello binghe

可以看到,在服务消费者输出的信息中,除了向服务提供者发送的数据与接收服务提供者响应的数据外,还在RpcConsumerHandlerTest类的main()方法中打印出了通过自定义的RPCFuture对象获取的最终结果数据为hello binghe。符合预期的效果。

(4)再次查看服务提供者日志

再次查看服务提供者输出的日志信息,如下所示。

13:47:55,948INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:47:55,948INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者使用CGLib的方式调用了真实的方法。

(1)修改同步调用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();consumer.sendRequest(getRpcRequestProtocol());LOGGER.info("无需获取返回的结果数据");consumer.close();}

可以看到,在单向调用中,并没有获取返回结果。

(2)修改构建RpcProtocol对象的方法

修改getRpcRequestProtocol()方法中构建RpcRequest的方法参数,将是否是单向调用的参数设置为true,源码如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){//模拟发送数据 //#############省略其他代码#################request.setAsync(false);request.setOneway(true);protocol.setBody(request);return protocol;}

(3)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输出的结果信息如下所示。

13:58:26,417INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:58:26,524INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":true,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:58:26,531INFO RpcConsumerHandlerTest:39 - 无需获取返回的结果数据

可以看到,服务消费者向服务提供者发送数据后,并没有获取返回的结果数据。

(4)再次查看服务提供者日志

再次查看服务提供者输出的日志信息,如下所示。

13:58:26,565INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:58:26,566INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者使用CGLib的方式调用了真实的方法。

目前实现的RPC框架以Java原生进程的方式启动后,能够实现服务消费者以同步、异步和单向调用的方式与服务提供者之间进行数据交互。至此,我们写的RPC框架的功能又进一步得到了增强。

我们写的RPC框架正在一步步实现它该有的功能。

© 版权声明
评论 抢沙发
加载中~
每日一言
不怕万人阻挡,只怕自己投降
Not afraid of people blocking, I'm afraid their surrender