写了消费者,那再写下生产者吧,兴许应该再写写网络模型和Broker端的内容,免得偏科,毕竟也吃过亏。虽然仍是聚焦于一点,但是生产者的内容也不少,与分析消费者如何拉取数据那篇一样,着重描述一下如何发送数据,理解线程模型始终是最重要的。本篇不涉及事务相关内容,就展示一下整体逻辑,然后挑几个核心方法进行说明。

本篇讨论的版本为Java客户端v2.8。

运行概览

相比于消费者有poll或者push的花样,在基本模式上生产者就直白很多,它肯定是往服务端去推数据。但是同样的,kafka生产者并不是调用一次API就发起一次网络请求,为了高性能其中做了很多操作。首先来看一下kafka生产者运行的大体逻辑:

从线程方面来说主要涉及发送调用线程、Sender线程以及图中没有展示的网络IO线程。整体运作也是一个生产者消费者模型,调用线程是“生产者”,Sender线程是“消费者”,它们通过Accumulator这个包含N个队列的类对象来进行数据交互。可以看到这套东西核心就是批处理,加大吞吐量,提高效率。

Send方法

对于应用程序来说,要发送数据就要调用kafka生产者的send方法,它是与应用程序交互的门面,本文所述的其他内容都是其背后执行的服务,该方法描述如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

很明显这是一个异步形式的调用,这与kafka的网络模型息息相关,当然异步也可以转同步,只需要对返回的Future调用get方法即可。API非常简洁,代码也是一贯的不错,接下来对核心方法逐行看个大概:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 先做关闭检查
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();

// 集群元数据处理,比如topic的分片信息、节点地址信息等
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;

// 对Key(如果指定了)和Value进行序列化
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}

// 按照指定或分区逻辑确定这条数据应该发送到哪个partition
// 默认情况下的分区逻辑是如果显式指定了分区,那么直接使用指定分区
// 如果指定了key则对key进行hash取余确定分区
// 剩余情况使用sticky粘性策略,按照批次将数据滚动发送到各个分片
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

// 计算这个消息的大小并进行上限检查
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}

// 对callback进行包装,嵌入producer的interceptor
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}

// 核心方法,把数据塞入缓冲队列
// 这个操作后面又判断重新处理了一次,主要是为了适配sticky逻辑
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 唤醒sender线程,sender线程阻塞的位置是KafkaClient.poll
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}

// 返回在将数据塞入缓冲队列过程中生成的future对象
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

这个方法的逻辑并不是特别重,或者说重的逻辑被忽略或放到后面,比较直白,首先把数据序列化,然后确定要发送到哪个分区,最后把数据塞给中介Accumulator进行处理。

Sender线程

send方法并不会直接触发或者说准备触发消息的实际发送,这个工作由Sender线程来完成。KafkaProducer有两个相关的成员:ioThreadSender,该线程在生产者的构造函数中启动,在close方法中被等待关闭或强制关闭。看一下重点代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// balabala ...
}

/**
* Run a single iteration of sending
*
*/
void runOnce() {
if (transactionManager != null) {
// balabala ...
}

long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}

// 核心逻辑方法
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();

// 调用accumulator获取准备就绪的节点、下一次需要检查的时间、未知leader的topic
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 如果存在任何未知leader的topic partition,则强制更新集群元数据
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

// 调用网络客户端检查节点是否就绪(没有未完成的元数据更新请求、
// 已经建立连接、没有超过同时发送的请求数量限制),将未就绪的节点从列表中移除
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// 核心方法,根据ready节点信息和最大请求大小从accumulator中取出数据,
// 按照节点进行分组,如果保证顺序则对分片做mute标记处理,这个标记会影响
// ReadyCheck的判断和发送数据的获取
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

// batch有过期时间,对所有逾期未发送的batch进行处理,主要是对关联的future进行操作以及内存清理
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}

// 监控打点
sensors.updateProduceRequestMetrics(batches);

// 计算pollTimeout,这个计算考量的东西还不少,后面单独描述
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}

// 循环每个节点发送相关请求
sendProduceRequests(batches, now);
return pollTimeout;
}

其中两个核心操作都是Accumulator的方法调用:ready和drain,直白点来说就是先获取能够发送数据的节点,然后从accumulator中取出相关数据,给每个节点发送请求(异步)。

Sender线程正常情况下是一个无限循环,而每次循环之间的等待主要是调用NetClient.poll(timeout, currentMs),这个timeout值的计算略复杂:

所有待发送的batch计算后取最小:每个batch的计算值为 lingerMs 减去 batch已经等待的时间,这个值记作 nextReadyCheckDelayMs

对所有没有ready的节点计算后取最小:每个节点取 pollDelayMs ,这个值记作 notReadyTimeout

accumulator有个数值成员 nextExpiryTimeMs ,应该是最近的一个Batch的过期时间,这个值减去当前时间, 记作 nextExpiryDurationMs

nextReadyCheckDelayMs 、notReadyTimeout、nextExpiryDurationMs 三者取小值就得出了 pollTimeout 值

所以基本上是以配置的 linger.ms 作为消息的缓冲时间

Accumulator

如上所述,Accumulator是实现批量发送核心介质,看一下这个类的注释:

1
2
3
4
5
6
7
/**
* This class acts as a queue that accumulates records into {@link MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/

核心两个点:缓冲队列和内存分配。内存分配暂不细说,在缓冲队列这一点上,它的实现方式是每个TopicPartition一个队列(ConcurrentMap),队列的具体实现为ArrayDeque,队列中的成员类型为ProducerBatch,正如其名称所示它代表了一批消息数据。在处理队列时均使用synchronized进行包装,以此便捷地解决并发问题。下面依次看一下上文提到过的三个主要方法。

append方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 利用concurrentMap做getOrCreate操作获取topicPartition对应的队列
// check if we have an in-progress batch
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

// 同步包装
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 调用tryAppend方法尝试将数据写入队列头部的第一个batch
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}

// 配合sticky分区逻辑,前面的tryAppend不会创建新的batch,
// 如果尾部batch已经满了或者已经关闭则会继续执行到此处
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}

// 重新算了一遍消息相关信息
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
// 分配新的batch所需要的内存
buffer = free.allocate(size, maxTimeToBlock);

// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
// 同步包装
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");

// 再次尝试将数据写入已经存在的尾部batch,这里有可能存在并发,
// 所以如果恰巧别的调用线程已经创建了新的batch并且塞入了队列,
// 那么直接使用新的batch,分配的内存会在finally块中释放
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}

// 还是写入失败,则创建新的batch,将数据写入这个新batch,然后将batch添加到队列尾部
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));

dq.addLast(batch);
incomplete.add(batch);

// 阻止分配的内存释放
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

ProducerBatch内部操作主要是数据的转换和写入以及一些校验,这里不详细描述。简而言之append方法就是往队列里的Batch写入数据,如果队列尾部Batch写满了或者无法写入则创建新的batch写入数据后插入队列,创建batch时会涉及内存分配,防止无限膨胀内存溢出。

ready方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
* {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
* is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
* <li>The record set is full</li>
* <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
* <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
* are immediately considered ready).</li>
* <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
// 循环遍历所有的队列
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
// 同步块包装
synchronized (deque) {
// peek队列头部元素
// When producing to a large number of partitions, this path is hot and deques are often empty.
// We check whether a batch exists first to avoid the more expensive checks whenever possible.
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
// 获取分片对应的leader node
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {
// 如果leader未知则加入集合,后续触发元数据更新
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part)) {
// 获取batch最后一次修改后经过的时间
long waitedTimeMs = batch.waitedTimeMs(nowMs);
// 判断是否处于失败重试状态
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 正常情况下等待时间取lingerMs,否则取retryBackoffMs
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
// 节点可发送的判断逻辑
boolean sendable = full || expired || exhausted || closed || flushInProgress();
// 如果节点可发送则加入ReadyNodes set,
// 否则计算更新下一次需要进行ready检查的延迟时间
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
// 返回对象包装: 可发送的节点、下一次ready检查的延迟时间、存在未知partition leader的topic
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

这个方法返回了三项内容:

  1. 根据应用逻辑得出的可以发送数据的节点。并不是所有节点都处于可发送的状态,并且后续还会对这些节点进行网络连接层面的检查。
  2. 下一次ready检查的延迟。这个值会影响sender线程循环的polltimeout计算。
  3. 存在未知分片leader的topic集合。这是为了更新集群元数据信息,分片leader未知情况下无法发送数据到对应的分片。

drain方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();

// 根据前面一系列操作得出的ready nodes进行遍历,取每个节点需要发送的batch列表
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}

// 取节点对应的batch列表
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
// 根据集群元数据信息获取节点拥有leader的分片信息
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
// drainIndex是个成员变量,这里的操作是为了防止一直从第一个分片开始遍历产生饥饿
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
// 循环每个分片对应的队列
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();

// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
continue;

Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;

// 对队列的操作都在同步块内部
synchronized (deque) {
// 取队列头部batch进行判断
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;

// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;

if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// 如果超过了请求体的大小限制,直接跳出循环
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
break;
} else {
// 事务相关条件判断
if (shouldStopDrainBatchesForPartition(first, tp))
break;

boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
// poll出头节点
ProducerBatch batch = deque.pollFirst();
// 事务相关处理
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// balabala ...
}
// 关闭batch,阻止数据继续写入batch
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);

// 更新batch被poll出队列的时间
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}

该方法展现了Sender是如何从accumulator中取出待发送的数据,可以简述为:在计算得出可发送的节点后,获取每个节点持有leader的分片,从这些分片对应的队列中各自至多取出一个batch,得出每个节点需要发送的batch列表。

这些数据随后会被封装为请求对象,异步发送到对应的节点,然后结束一轮sender逻辑,等待网络客户端polltimeout后进入下一轮循环。

相关参数

核心相关参数可以查看Accumulator的构造函数,主要有如下:

  • linger.ms:如上所述,这个值会影响Sender线程的循环速度,该值越大循环等待的时间就可能更长,允许更多的消息合并到一个request中发送到服务端,减少网络IO,提高吞吐量。不过也要注意到Sender线程的循环等待时间并不是固定值,而且可以通过wake调用提前结束。
  • retry.backoff.ms:重试情况下需要等待的时间
  • delivery.timeout.ms:由于Send方法并不会立即发送数据,而是先写入缓冲队列,为了保证数据不会一直滞留不发送,可以配置这个超时时间,过期的batch会被取消发送,同时反馈失败给方法返回的Future对象
  • batch.size:限制每个batch的数据大小,在做写入操作时batch写满了便会生成新的batch。如果这个值设的较小会生成更多的batch导致需要发送的请求变多,降低吞吐量;如果这个值设置的过大,则会浪费内存,因为每次生成batch时会直接分配该值指定的内存大小。
  • buffer.memory:缓冲区能够使用的最大内存大小,防止缓冲区内存溢出。