// 订阅检查 if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { thrownewIllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); }
do { client.maybeTriggerWakeup();
// 2. 元数据更新(包括rebalacne和拉取数据位置更新等) if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer for join group updateAssignmentMetadataIfNeeded(timer, false); } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) { log.warn("Still waiting for metadata"); } }
// 3. 数据拉取(本篇重点) final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // 如果取到了数据,为了提高效率尝试发送下一轮fetch请求 // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.transmitSends(); }
// 4. 拦截器处理、返回 returnthis.interceptors.onConsume(newConsumerRecords<>(records)); } } while (timer.notExpired());
// We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; }
log.trace("Polling for fetches with timeout {}", pollTimeout);
/** * 为所有已经分配了分片且没有在途请求或等待数据的节点发送fetch请求 * @return 发送的fetch请求数量 */ publicsynchronizedintsendFetches() { // 监控打点相关数据更新 // Update metrics in case there was an assignment change sensors.maybeUpdateAssignment(subscriptions);
log.debug("Sending {} {} to broker {}", isolationLevel, data, fetchTarget);
// 3. 发送请求,注意kafka客户端使用的是异步IO,这里只是把请求加入发送队列,不会阻塞 RequestFuture<ClientResponse> future = client.send(fetchTarget, request); // 每个节点不会同时发送多个请求,为了达到这个目的,会把已经发送数据的节点id加入到set中 // 收到节点响应后会从set中移除相应的id,见下面addListener部分 // 注意响应处理有使用synchronized修饰 // We add the node to the set of nodes with pending fetch requests before adding the // listener because the future may have been fulfilled on another thread (e.g. during a // disconnection being handled by the heartbeat thread) which will mean the listener // will be invoked synchronously. this.nodesWithPendingFetchRequests.add(entry.getKey().id()); // kafka的网络IO都是这种自己实现的异步callback形式,包装的很漂亮 future.addListener(newRequestFutureListener<ClientResponse>() { @Override publicvoidonSuccess(ClientResponse resp) { synchronized (Fetcher.this) { try { @SuppressWarnings("unchecked") FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); FetchSessionHandlerhandler= sessionHandler(fetchTarget.id()); if (handler == null) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id()); return; } if (!handler.handleResponse(response)) { return; }
/** * 返回拉取的数据,清空记录缓存,更新消费位置 * * NOTE: 返回空记录保证消费位置不会更新 * * @return 每个分片对应拉取的数据 * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. */ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = newHashMap<>(); Queue<CompletedFetch> pausedCompletedFetches = newArrayDeque<>(); // 限定返回的最大记录数,即经典配置项 max.poll.records 所定 intrecordsRemaining= maxPollRecords;
try { // 循环从接收缓冲区获取数据 while (recordsRemaining > 0) { if (nextInLineFetch == null || nextInLineFetch.isConsumed) { CompletedFetchrecords= completedFetches.peek(); if (records == null) break;
if (records.notInitialized()) { try { // 处理返回信息,更新HW、副本偏好信息、错误处理等 nextInLineFetch = initializeCompletedFetch(records); } catch (Exception e) { // Remove a completedFetch upon a parse with exception if (1) it contains no records, and // (2) there are no fetched records with actual content preceding this exception. // The first condition ensures that the completedFetches is not stuck with the same completedFetch // in cases such as the TopicAuthorizationException, and the second condition ensures that no // potential data loss due to an exception in a following record. FetchResponse.PartitionData<Records> partition = records.partitionData; if (fetched.isEmpty() && (partition.records() == null || partition.records().sizeInBytes() == 0)) { completedFetches.poll(); } throw e; } } else { nextInLineFetch = records; } completedFetches.poll(); } elseif (subscriptions.isPaused(nextInLineFetch.partition)) { // when the partition is paused we add the records back to the completedFetches queue instead of draining // them so that they can be returned on a subsequent poll if the partition is resumed at that time log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition); pausedCompletedFetches.add(nextInLineFetch); nextInLineFetch = null; } else { // 解析数据的核心方法,返回的响应可能包含多条信息,允许只返回其中的若干条 // 执行数据解码,以及相关的监控打点,有时间可以细看 List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);
// 响应结果(map)处理 if (!records.isEmpty()) { TopicPartitionpartition= nextInLineFetch.partition; List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); if (currentRecords == null) { fetched.put(partition, records); } else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). // we have to copy to a new list because the old one may be immutable // 看注释是为了兼容不可变的list,所以每次都新建list重新put到结果map中 List<ConsumerRecord<K, V>> newRecords = newArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } // 更新剩余记录数 recordsRemaining -= records.size(); } } } } catch (KafkaException e) { if (fetched.isEmpty()) throw e; } finally { // add any polled completed fetches for paused partitions back to the completed fetches queue to be // re-evaluated in the next poll completedFetches.addAll(pausedCompletedFetches); }