此番重新整理一下Dubbo消费端线程模型,做这件事情的原因是在对负责的应用进行梳理的过程中重新发现了之前遗漏的问题——空闲Dubbo线程过多,为了针对性地进行解决和优化,有必要对此进行进一步学习梳理,做到心中有数,防止改出问题。同时,我个人是一直认为线程模型是极其重要的一块内容(虽然我到现在才理了这个东西,学了不用是真记不住),尤其是在使用第三方库的时候,搞不清楚线程模型极容易翻车或者做出错误实践,时刻保持清楚你的代码运行在什么线程上是一件重要的事情。

Dubbo作为一款经典的国产中间件这几年也是经过曲折重新繁荣发展,到目前也演进到了3.0版本,云原生、dubbo-go等等花头不少,话说Dubbo的历史比我的开发史都要长的多的多,作为一款成熟的RPC工具复杂度也挺高了,我也就是窥得沧海一粟,在特定的版本下分析特定的问题。本篇针对的版本是2.7.8,只分析消费端线程模型。

版本变动

说到这个就不得不提一下Dubbo 2.7.5版本对消费端线程池模型的优化:官网博文,还是要贴一下经典的两张对比图,虽然我觉得光看这个图还是差点意思,但是理解之后确实还就得这么画:

老的线程池模型:

  1. 业务线程发出请求,拿到一个 Future 实例。
  2. 业务线程紧接着调用 future.get 阻塞等待业务结果返回。
  3. 当业务数据返回后,交由独立的 Consumer 端线程池进行反序列化等处理,并调用 future.set 将反序列化后的业务结果置回。
  4. 业务线程拿到结果直接返回

新的线程池模型:

  1. 业务线程发出请求,拿到一个 Future 实例。
  2. 在调用 future.get() 之前,先调用 ThreadlessExecutor.wait(),wait 会使业务线程在一个阻塞队列上等待,直到队列中被加入元素。
  3. 当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列
  4. 业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future。
  5. 业务线程拿到结果直接返回

图中白色方框表示单个线程,黑色框表示线程池。

发送请求

将dubbo调用拆解一下分为发送请求和等待响应两个部分。想要了解这个最方便的手段就是本地做一个demo然后debug跟着代码走一遍,这里简要总结一下,抛开较为复杂的封装、扩展性。

对于一个典型的调用(双向、同步、非泛化、Netty通信、dubbo协议)来说,发起请求的过程可以描述如下:

  1. 获取到一个服务调用的接口代理对象
  2. 代理会将调用信息封装为一个RpcInvocation,它包括了方法名、接口名、参数等
  3. mock处理、retry处理、其他处理
  4. 根据LB策略取到一个调用对象Invoker
  5. Invoker做一些准备操作:比如处理RpcContext等等
  6. 经过一系列操作Invoker最终会调用到网络传输层的NettyClient
  7. 获取到client线程池,生成一个requestId(AtomicLong),生成一个DefaultFuture(继承自CompletableFuture,引用client线程池),然后把<id, DefaultFuture>存入静态map里
  8. 在时间轮中提交一个超时任务,调用Netty发送网络请求(在Netty中注册的encoder会进行编码操作)返回一个包装好的AsyncResult
  9. 调用AsyncResult.get阻塞等待,这个get里面包含了ThreadlessExecutor.waitAndDrain调用

上述这些操作都是在业务线程中单线程执行,也就是图中所示的 Biz Thread。再回过头来看看Dubbo的框架分层和服务调用链示意图,其实都是对得上号的。

等待响应

请求发出去以后provider端接收到请求,执行相关代码,取得结果后同样通过Netty返回响应信息。

  1. Netty的worker thread收到数据触发channelRead
  2. 进入Dubbo注册在Netty中的handler:NettyServerHandler
  3. 经过filter调用链处理MultiMessage、心跳等
  4. 进入WrappedChannelHandler的具体类,这里就是dispatcher配置项all、message、execution、direct等产生作用的地方,配置项默认为all。但是针对收到数据的received方法来说,这几个配置大致都是相同的。
  5. 根据reqeustId从静态的map取到DefaultFuture,然后判断获取线程池
    1. 如果没有取到DefaultFuture,那么返回共享线程池(前缀就是DubboClientHandler-xxx)
    2. 如果取到了,那么就使用DefaultFuture关联引用的线程池
  6. 取到线程池之后,向这个线程池中提交一个任务对象:ChannelEventRunnable
  7. 这个ChannelEventRunnable执行的内容为:header转发、参数decode、把结果set到DefaultFuture里面,最后在静态map中清理掉requestId
  8. 这样执行完后,业务线程就能够通过DefaultFuture(CompletableFuture)获取到结果

这部分涉及到两个线程:Netty的worker线程(io线程)和执行响应处理任务使用的线程。

ThreadlessExecutor

两版线程模型的执行过程基本都是一样的如上面所述,区别就在于获取的客户端线程池的具体实现。新版本引入了名为ThreadlessExecutor的线程池,它也实现了ExecutorService接口,所以对其他部分代码造成的改动就会比较小。简单来说,它就是把提交的任务存到一个阻塞队列等着调用方来取(waitAndDrain),并且让调用线程在取到后自己执行这个任务。但是注意这个行为只会执行一次,如果已经取完了或者还没开始等待,那么提交的任务还是会交给delegate线程池去处理,这个delegate线程池就是原来旧版本使用的客户端线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ThreadlessExecutor extends AbstractExecutorService {
private static final Logger logger = LoggerFactory.getLogger(ThreadlessExecutor.class.getName());

private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

private ExecutorService sharedExecutor;

private CompletableFuture<?> waitingFuture;

private boolean finished = false;

private volatile boolean waiting = true;

private final Object lock = new Object();

/* .................... blablabla .................... */

public void waitAndDrain() throws InterruptedException {
// 只处理一次
if (finished) {
return;
}

// take做阻塞等待
Runnable runnable = queue.take();

synchronized (lock) {
waiting = false;
// 调用waitAndDrain的线程自己执行runnable
runnable.run();
}

runnable = queue.poll();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.info(t);

}
runnable = queue.poll();
}
// mark the status of ThreadlessExecutor as finished.
finished = true;
}

/* .................... blablabla .................... */

@Override
public void execute(Runnable runnable) {
synchronized (lock) {
if (!waiting) {
sharedExecutor.execute(runnable);
} else {
// 把runnable放入阻塞队列
queue.add(runnable);
}
}
}

/* .................... blablabla .................... */
}

编解码

收到响应数据以后在什么地方做反序列化/解码是消费者线程模型最重要的特征,对于编解码的处理是一个绕不开的问题。计算机数据本质来说是一串二进制,为了知道这一串二进制代表的内容有多长、是什么,我们需要规则来“读懂”它,这个规则也就是所谓的协议。为了让数据有更高的灵活度和自描述性,数据通常会被分为header和body。

对于Dubbo来说,那就是dubbo的协议(如上图所示),注意Dubbo里的protocol接口、配置和这里的描述不是一码事,这里所说的协议对应的是DubboCodec类。

Header解析通常是比较轻量的,因为一般都是由固定长度的基本类型组成,但是Body就不一定,可能会比较重,对于RPC来说尤其如此,Body里面主要内容就是方法调用的参数,对于Dubbo来说就是RpcInvocation对象。所以数据的解码可能会分成两步:Header解析和Body解析,Dubbo也是这么干的。对于Body解析来说就是基本就是Java对象的反序列化,Dubbo支持多种序列化框架就体现在这个地方,默认使用的是Hessian2,参数序列化对于一个RPC框架的性能是相当重要的,可以参考这篇替换序列化框架的官方博文

因为Header解析比较轻,所以Dubbo选择直接在底层通信框架的IO线程上执行处理,而Body解析比较重,因此提供了更多的可配置性,可能就涉及到消费线程。根据Dubbo的设计,有三种形式:

  1. 直接在IO线程上处理:这样可能会造成IO线程阻塞,一般不会这么做。
  2. 在consumer线程池中处理:这种就是旧的线程模型以及异步调用时执行的方式。
  3. 业务调用线程自己处理:这是ThreadlessExecutor起作用时的形式。

Dubbo-thread前缀

Consumer线程池的线程名称前缀默认是DubboClientHandler-ip:port-thread- ,但是在做threaddump的时候有可能你会发现没有这个前缀的线程,反而是有一些前缀为Dubbo-thread-的线程,这是因为线程池被关闭了然后重新创建,可以参考下这篇博文。我这里简单总结并贴下源码,在创建consumer线程池时做了默认配置,包括默认的名称前缀和默认的线程池类型(cached),但是重新创建的时候并没有做这些操作,导致线程池的配置是全局的默认值(fixed)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.apache.dubbo.remoting.transport;

public abstract class AbstractClient extends AbstractEndpoint implements Client {

protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";

/* .................... blablabla .................... */

private void initExecutor(URL url) {
// DubboClientHandler-<address>-thread-
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
// DEFAULT_CLIENT_THREADPOOL = cached
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}

/* .................... blablabla .................... */
}

总结

  • 究其根本,RPC的consumer在做同步调用时主要就是干了这么几个步骤:

​ 调用代理/存根方法 => 调用信息序列化/编码 => 发送网络请求 => 阻塞等待结果返回 <= 收到响应后反序列化/解码

  • 底层的通信框架如果是异步的,就要做异步转同步。在连接复用的异步网络调用下为了让请求和响应对的上号,也就是说要在收到响应的时候知道这是哪个请求的响应,并且同时要处理好调用线程的阻塞等待,dubbo的解法是静态并发map + requestId + CompletableFuture
  • 客户端线程池在新版本也做了优化,原来是每个tcp链接对应一个线程池,现在是每个server端口对应一个线程池(可以说是每个provider应用对应一个线程池)。在某个服务的连接全部断开时,这个线程池会被shutdown,后续重新建立连接时会进行重建。线程池在首次创建时会添加一些默认配置但是在重建时并没有处理,这就导致重建后线程池的配置和线程前缀名称发生变化。全局的线程池默认配置下,类型为Fixed,线程池大小为200,这样对于一个依赖大量服务方的应用,就有可能造成产生一堆空闲线程的问题,解决方法就是显式配置consumer线程池。
  • 客户端线程池的主要动作就是调用数据的反序列化,但是在新的线程模型下,阻塞调用时这个操作改由ThreadlessExecutor移交到调用线程来处理。这种情况下这个consumer线程池基本上不会执行什么任务,只会处理连接建立、连接断开和心跳(取决于dispatcher配置,见WrappedChannelHandler子类)以及双方超时配置不一致导致的超时响应处理。所以这个线程池可以配一个很小的核心线程数。
  • 双方超时配置不一致导致的处理是指:举个例子,对于一个接口consumer配了3s超时,而provider配了5s超时,如果provider执行耗时4s返回了响应,但是consumer端已经3s超时,静态map中已经没有了DefaultFuture,这个时候还是会使用consumer线程池处理,不过处理内容基本就只是打一条日志。
  • 特别注意,上述结论仅限于同步调用的情况。Dubbo到这个版本已经支持method返回Future类型来做异步调用,当然原来的RpcContext形式也是兼容的。在异步调用情况下,还是需要这个consumer线程池来承担响应内容的一系列处理操作。
火焰图与JOL
Kafka消费者消息拉取机制