consumer

主要使用ThreadlessExecutor实现全consumer的全双工通讯。consumer创建本次请求requestId用于responserequest匹配。
然后分以下几步完成一次请求发送并接收结果:
槽:发送消息前将用于接收结果的executor放到一个map存储
发送消息:调用netty发送
挂起线程等待response
异步接收结果:netty线程收到结果后唤醒之前挂起的线程

创造一个ThreadlessExecutor,并将其存储到DefaultFuture.FUTURES中。之后消息返回后会从DefaultFuture.FUTURES取出ThrealessExecutor设置结果。

构建ThreadlessExecutor

<init>:56, ThreadlessExecutor (org.apache.dubbo.common.threadpool)
getCallbackExecutor:190, AbstractInvoker (org.apache.dubbo.rpc.protocol)
doInvoke:103, DubboInvoker (org.apache.dubbo.rpc.protocol.dubbo)
invoke:163, AbstractInvoker (org.apache.dubbo.rpc.protocol)
invoke:52, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:69, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
sayHello:-1, proxy0 (org.apache.dubbo.common.bytecode)
main:43, Consumer (org.apache.dubbo.samples.mock)

存储到DefaultFuture.FUTURES

<init>:85, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
newFuture:108, DefaultFuture (org.apache.dubbo.remoting.exchange.support)
request:133, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
request:95, HeaderExchangeClient (org.apache.dubbo.remoting.exchange.support.header)
request:91, ReferenceCountExchangeClient (org.apache.dubbo.rpc.protocol.dubbo)
doInvoke:105, DubboInvoker (org.apache.dubbo.rpc.protocol.dubbo)
invoke:163, AbstractInvoker (org.apache.dubbo.rpc.protocol)
invoke:52, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:69, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
sayHello:-1, proxy0 (org.apache.dubbo.common.bytecode)
main:43, Consumer (org.apache.dubbo.samples.mock)

发送消息

业务线程会将消息发送到NioEventLoop任务队列

offerTask:330, SingleThreadEventExecutor (io.netty.util.concurrent)
addTask:321, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:765, SingleThreadEventExecutor (io.netty.util.concurrent)
safeExecute:1007, AbstractChannelHandlerContext (io.netty.channel)
write:825, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:794, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:831, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:1071, DefaultChannelPipeline (io.netty.channel)
writeAndFlush:300, AbstractChannel (io.netty.channel)
send:162, NettyChannel (org.apache.dubbo.remoting.transport.netty4)
send:178, AbstractClient (org.apache.dubbo.remoting.transport)
send:53, AbstractPeer (org.apache.dubbo.remoting.transport)
request:135, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
request:95, HeaderExchangeClient (org.apache.dubbo.remoting.exchange.support.header)
request:91, ReferenceCountExchangeClient (org.apache.dubbo.rpc.protocol.dubbo)
doInvoke:105, DubboInvoker (org.apache.dubbo.rpc.protocol.dubbo)
invoke:163, AbstractInvoker (org.apache.dubbo.rpc.protocol)
invoke:52, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:69, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
sayHello:-1, proxy0 (org.apache.dubbo.common.bytecode)
main:43, Consumer (org.apache.dubbo.samples.mock)

挂起线程

waitAndDrain:89, ThreadlessExecutor (org.apache.dubbo.common.threadpool)
get:179, AsyncRpcResult (org.apache.dubbo.rpc)
invoke:61, AsyncToSyncInvoker (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:51, FutureFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:69, ConsumerContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:78, ListenerInvokerWrapper (org.apache.dubbo.rpc.listener)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
doInvoke:82, FailoverClusterInvoker (org.apache.dubbo.rpc.cluster.support)
invoke:259, AbstractClusterInvoker (org.apache.dubbo.rpc.cluster.support)
intercept:47, ClusterInterceptor (org.apache.dubbo.rpc.cluster.interceptor)
invoke:92, AbstractCluster$InterceptorInvokerNode (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:82, MockClusterInvoker (org.apache.dubbo.rpc.cluster.support.wrapper)
invoke:74, InvokerInvocationHandler (org.apache.dubbo.rpc.proxy)
sayHello:-1, proxy0 (org.apache.dubbo.common.bytecode)
main:43, Consumer (org.apache.dubbo.samples.mock)

异步接收结果

netty线程中接收结果并唤醒之前阻塞业务线程

execute:138, ThreadlessExecutor (org.apache.dubbo.common.threadpool)
received:62, AllChannelHandler (org.apache.dubbo.remoting.transport.dispatcher.all)
received:90, HeartbeatHandler (org.apache.dubbo.remoting.exchange.support.header)
received:43, MultiMessageHandler (org.apache.dubbo.remoting.transport)
received:147, AbstractPeer (org.apache.dubbo.remoting.transport)
channelRead:83, NettyClientHandler (org.apache.dubbo.remoting.transport.netty4)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:286, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:310, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:284, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1434, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:965, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:647, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:582, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:499, NioEventLoop (io.netty.channel.nio)
run:461, NioEventLoop (io.netty.channel.nio)
run:884, SingleThreadEventExecutor$5 (io.netty.util.concurrent)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)

provider

接收请求:netty线程接收请求
切换线程:将请求转给业务线程
返回结果:业务线程处理之后调用channel.write返回结果

接收请求

received:62, AllChannelHandler (org.apache.dubbo.remoting.transport.dispatcher.all)
received:90, HeartbeatHandler (org.apache.dubbo.remoting.exchange.support.header)
received:43, MultiMessageHandler (org.apache.dubbo.remoting.transport)
received:147, AbstractPeer (org.apache.dubbo.remoting.transport)
channelRead:98, NettyServerHandler (org.apache.dubbo.remoting.transport.netty4)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:286, IdleStateHandler (io.netty.handler.timeout)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:310, ByteToMessageDecoder (io.netty.handler.codec)
channelRead:284, ByteToMessageDecoder (io.netty.handler.codec)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:340, AbstractChannelHandlerContext (io.netty.channel)
channelRead:1434, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelRead:362, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelRead:348, AbstractChannelHandlerContext (io.netty.channel)
fireChannelRead:965, DefaultChannelPipeline (io.netty.channel)
read:163, AbstractNioByteChannel$NioByteUnsafe (io.netty.channel.nio)
processSelectedKey:647, NioEventLoop (io.netty.channel.nio)
processSelectedKeysOptimized:582, NioEventLoop (io.netty.channel.nio)
processSelectedKeys:499, NioEventLoop (io.netty.channel.nio)
run:461, NioEventLoop (io.netty.channel.nio)
run:884, SingleThreadEventExecutor$5 (io.netty.util.concurrent)
run:30, FastThreadLocalRunnable (io.netty.util.concurrent)
run:748, Thread (java.lang)

切换线程

sayHello:31, DemoServiceImpl (org.apache.dubbo.samples.mock.impl)
invokeMethod:-1, Wrapper1 (org.apache.dubbo.common.bytecode)
doInvoke:47, JavassistProxyFactory$1 (org.apache.dubbo.rpc.proxy.javassist)
invoke:84, AbstractProxyInvoker (org.apache.dubbo.rpc.proxy)
invoke:56, DelegateProviderMetaDataInvoker (org.apache.dubbo.config.invoker)
invoke:56, InvokerWrapper (org.apache.dubbo.rpc.protocol)
invoke:52, ExceptionFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:89, MonitorFilter (org.apache.dubbo.monitor.support)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:46, TimeoutFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:77, TraceFilter (org.apache.dubbo.rpc.protocol.dubbo.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:129, ContextFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:152, GenericFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:38, ClassLoaderFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
invoke:41, EchoFilter (org.apache.dubbo.rpc.filter)
invoke:81, ProtocolFilterWrapper$1 (org.apache.dubbo.rpc.protocol)
reply:145, DubboProtocol$1 (org.apache.dubbo.rpc.protocol.dubbo)
handleRequest:100, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:175, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

返回结果

在这里插入图片描述

offerTask:327, SingleThreadEventExecutor (io.netty.util.concurrent)
addTask:321, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:765, SingleThreadEventExecutor (io.netty.util.concurrent)
safeExecute:1007, AbstractChannelHandlerContext (io.netty.channel)
write:825, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:794, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:831, AbstractChannelHandlerContext (io.netty.channel)
writeAndFlush:1071, DefaultChannelPipeline (io.netty.channel)
writeAndFlush:300, AbstractChannel (io.netty.channel)
send:162, NettyChannel (org.apache.dubbo.remoting.transport.netty4)
send:98, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
send:87, HeaderExchangeChannel (org.apache.dubbo.remoting.exchange.support.header)
lambda$handleRequest$0:110, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
accept:-1, 195186633 (org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler$$Lambda$148)
uniWhenComplete:774, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:792, CompletableFuture (java.util.concurrent)
whenComplete:2153, CompletableFuture (java.util.concurrent)
whenComplete:110, CompletableFuture (java.util.concurrent)
handleRequest:101, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:175, HeaderExchangeHandler (org.apache.dubbo.remoting.exchange.support.header)
received:51, DecodeHandler (org.apache.dubbo.remoting.transport)
run:57, ChannelEventRunnable (org.apache.dubbo.remoting.transport.dispatcher)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

细节

getPreferredExecutorService

不论consumer还是provider,都是通过AllChannelHandler#received接收远程信息区别在于consumer使用的是ThreadlessExecutor,而provider使用的是ThreadPoolExecutor。

AllChannelHandler#received中调用的getPreferredExecutorService方法会根据接收的消息是否response来决定使用哪个线程。
在这里插入图片描述

ThreadlessExecutor ThreadPoolExecutor
用途 consumer接收 provider接收
初始化时机 发送请求前 接收请求
存储位置 DefaultFuture.FUTURES DefaultExecutorRepository.data
存储key requestId 端口
业务线程? 业务线程&amp;IO线程 业务线程

ThreadlessExecutor#execute由IO线程调用,ThreadlessExecutor#waitAndDrain由业务线程调用
ThreadPoolExecutor存储位置相关细节可以DefaultExecutorRepository#createExecutorIfAbsent
requestId是标识每次请求的唯一id,每构造一次Request实例都会由Request#newId生成一个新的

ThreadlessExecutor

ThreadlessExecutor中并没有启动新的线程,其主要作用是将异步的远程访问转为同步。所以在调用堆栈中可以看到AsyncToSyncInvoker
业务线程调用waitAndDrain时,若远程provider尚未返回数据queue.take()会使线程挂起。
在这里插入图片描述

netty线程接收到provider的返回值之后会调用ThreadlessExecutor#excute,将结果添加queue中。于是之前因queue.take()挂起的线程可以继续运行了。
在这里插入图片描述

原文地址:https://blog.csdn.net/m1f2c3/article/details/134710843

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_13379.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注