Kafka生产者消息发送机制
写了消费者,那再写下生产者吧,兴许应该再写写网络模型和Broker端的内容,免得偏科,毕竟也吃过亏。虽然仍是聚焦于一点,但是生产者的内容也不少,与分析消费者如何拉取数据那篇一样,着重描述一下如何发送数据,理解线程模型始终是最重要的。本篇不涉及事务相关内容,就展示一下整体逻辑,然后挑几个核心方法进行说明。
本篇讨论的版本为Java客户端v2.8。
运行概览
相比于消费者有poll或者push的花样,在基本模式上生产者就直白很多,它肯定是往服务端去推数据。但是同样的,kafka生产者并不是调用一次API就发起一次网络请求,为了高性能其中做了很多操作。首先来看一下kafka生产者运行的大体逻辑:
从线程方面来说主要涉及发送调用线程、Sender线程以及图中没有展示的网络IO线程。整体运作也是一个生产者消费者模型,调用线程是“生产者”,Sender线程是“消费者”,它们通过Accumulator这个包含N个队列的类对象来进行数据交互。可以看到这套东西核心就是批处理,加大吞吐量,提高效率。
Send方法
对于应用程序来说,要发送数据就要调用kafka生产者的send方法,它是与应用程序交互的门面,本文所述的其他内容都是其背后执行的服务,该方法描述如下:
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
很明显这是一个异步形式的调用,这与kafka的网络模型息息相关,当然异步也可以转同步,只需要对返回的Future调用get方法即可。API非常简洁,代码也是一贯的不错,接下来对核心方法逐行看个大概:
1 | /** |
这个方法的逻辑并不是特别重,或者说重的逻辑被忽略或放到后面,比较直白,首先把数据序列化,然后确定要发送到哪个分区,最后把数据塞给中介Accumulator进行处理。
Sender线程
send方法并不会直接触发或者说准备触发消息的实际发送,这个工作由Sender线程来完成。KafkaProducer有两个相关的成员:ioThread
和Sender
,该线程在生产者的构造函数中启动,在close方法中被等待关闭或强制关闭。看一下重点代码:
1 |
|
其中两个核心操作都是Accumulator的方法调用:ready和drain,直白点来说就是先获取能够发送数据的节点,然后从accumulator中取出相关数据,给每个节点发送请求(异步)。
Sender线程正常情况下是一个无限循环,而每次循环之间的等待主要是调用NetClient.poll(timeout, currentMs)
,这个timeout值的计算略复杂:
所有待发送的batch计算后取最小:每个batch的计算值为 lingerMs 减去 batch已经等待的时间,这个值记作 nextReadyCheckDelayMs
对所有没有ready的节点计算后取最小:每个节点取 pollDelayMs ,这个值记作 notReadyTimeout
accumulator有个数值成员 nextExpiryTimeMs ,应该是最近的一个Batch的过期时间,这个值减去当前时间, 记作 nextExpiryDurationMs
nextReadyCheckDelayMs 、notReadyTimeout、nextExpiryDurationMs 三者取小值就得出了 pollTimeout 值
所以基本上是以配置的 linger.ms 作为消息的缓冲时间
Accumulator
如上所述,Accumulator是实现批量发送核心介质,看一下这个类的注释:
1 | /** |
核心两个点:缓冲队列和内存分配。内存分配暂不细说,在缓冲队列这一点上,它的实现方式是每个TopicPartition一个队列(ConcurrentMap),队列的具体实现为ArrayDeque,队列中的成员类型为ProducerBatch,正如其名称所示它代表了一批消息数据。在处理队列时均使用synchronized进行包装,以此便捷地解决并发问题。下面依次看一下上文提到过的三个主要方法。
append方法
1 | /** |
ProducerBatch内部操作主要是数据的转换和写入以及一些校验,这里不详细描述。简而言之append方法就是往队列里的Batch写入数据,如果队列尾部Batch写满了或者无法写入则创建新的batch写入数据后插入队列,创建batch时会涉及内存分配,防止无限膨胀内存溢出。
ready方法
1 | /** |
这个方法返回了三项内容:
- 根据应用逻辑得出的可以发送数据的节点。并不是所有节点都处于可发送的状态,并且后续还会对这些节点进行网络连接层面的检查。
- 下一次ready检查的延迟。这个值会影响sender线程循环的polltimeout计算。
- 存在未知分片leader的topic集合。这是为了更新集群元数据信息,分片leader未知情况下无法发送数据到对应的分片。
drain方法
1 | /** |
该方法展现了Sender是如何从accumulator中取出待发送的数据,可以简述为:在计算得出可发送的节点后,获取每个节点持有leader的分片,从这些分片对应的队列中各自至多取出一个batch,得出每个节点需要发送的batch列表。
这些数据随后会被封装为请求对象,异步发送到对应的节点,然后结束一轮sender逻辑,等待网络客户端polltimeout后进入下一轮循环。
相关参数
核心相关参数可以查看Accumulator的构造函数,主要有如下:
- linger.ms:如上所述,这个值会影响Sender线程的循环速度,该值越大循环等待的时间就可能更长,允许更多的消息合并到一个request中发送到服务端,减少网络IO,提高吞吐量。不过也要注意到Sender线程的循环等待时间并不是固定值,而且可以通过wake调用提前结束。
- retry.backoff.ms:重试情况下需要等待的时间
- delivery.timeout.ms:由于Send方法并不会立即发送数据,而是先写入缓冲队列,为了保证数据不会一直滞留不发送,可以配置这个超时时间,过期的batch会被取消发送,同时反馈失败给方法返回的Future对象
- batch.size:限制每个batch的数据大小,在做写入操作时batch写满了便会生成新的batch。如果这个值设的较小会生成更多的batch导致需要发送的请求变多,降低吞吐量;如果这个值设置的过大,则会浪费内存,因为每次生成batch时会直接分配该值指定的内存大小。
- buffer.memory:缓冲区能够使用的最大内存大小,防止缓冲区内存溢出。