Kafka消费者、生产者之前都有写文章描述过,然后在Broker这翻了车,我真的有点乏了,趁着心有不甘再学习总结下。唉,越来越没心气儿了,所以也懒得长篇大论,就以问答的形式来做个总结记录,想看细节的话,可以看看相关引用的原文,大部分是官方文档,基本上没毛病。特别推荐一下confluent官网的教程,图文并茂还有视频,真的不错。

还是说个大概吧

数据副本的单元是主题分片(Topic Partition),每个Partition拥有1个及以上的的Replica,其中包括1个Leader Replica和0~N个Follower Replica,副本数量称为Replication Factor。副本分散在各个Kafka Broker上(尽可能均匀),在高可用的场景下,同一个Partition的Replica一般分布在不同的Broker上以达到高可用的目的。作为同一份数据的副本,Follower和Leader在数据上保持一致(当然Leader可能会包含部分未同步到Follower上的数据),Kafka使用顺序Log的形式保存数据,所以副本同步本质上是一个全序广播。

Follower拉取数据还是Leader推送数据

Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.

所以是Follower从Leader处拉取数据,就好像一个普通的Consumer,当然具体细节上有所差别。

ISR是什么,如何判断和维护

In-Sync Replica,表面意思同步的副本。离线、数据落后过多的副本都不能被称为ISR,与之相反,满足这两个条件的副本就被认为是ISR。离线的判断条件是:1. Zookeeper管理下的集群,节点与zk保持连接和心跳,如果心跳超时(zookeeper.session.timeout.ms)则zk上相关联的节点被删除,表示离线。 2. KRaft协议下的集群,节点与Controller保持连接和心跳,如果心跳超时(broker.session.timeout.ms)则集群认为节点离线;数据落后过多的判断条件是Follower对于Leader数据的延迟时间(replica.lag.time.max.ms)是否超限,旧的Kafka版本会采用延迟的数据量来判断,因这种判断条件在突发的消息流量出现时可能会引发频繁的ISR变动,后被延迟时间所取代。

对于每个Patition,由Leader来判断和管理当前的ISR列表,Leader根据Follower发送的Fetch请求来追踪Follower的状况。ISR是动态变化的,在Kafka 2.7版本之前,Leader直接更新Zookeeper中的znode(/brokers/topics/[topic]/partitions/[partitionId]/state ),保存当前的副本情况,而在该版本之后,根据KIP-497,当ISR集合发生变化时,Leader发送AlterISR Request到Controller,由Controller负责ISR状态的变更保存。

Kafka属于AP还是CP

有两种常见的副本同步策略:主备复制和仲裁复制。 这两种情况下都会采取指定一个副本为Leader,其余副本为Follower的策略。 所有的写请求都通过Leader执行,然后Leader将写操作传播给Follower。

在主备复制中,Leader等待组内每个副本上的写操作完成后才向客户端确认。如果其中一个副本出现故障,Leader会将其从当前组中移除,继续向其余副本进行写操作。如果一个失效的副本恢复并赶上Leader的进度,则允许它重新加入组。在具有 f 个副本的情况下,主备复制可以容忍 f-1 个副本故障。

在仲裁复制中,Leader等待大多数副本上的写操作完成。即使一些副本处于故障状态,整个副本组的大小是保持不变的。如果有 2f+1个副本,则仲裁复制可以容忍 f 个副本故障。如果领导者失败,则需要至少 f+1个副本来选举新的Leader。

Kafka在设计上偏向于AP的实现(CAP中的AP),因而选择了主备复制的策略,维护ISR作为决策成员,数据的提交确认以及选主操作基本上都根据ISR成员来进行判断处理。主备复制可以容忍更多的失效节点,在两个节点的情况下也能够正常工作,但是可以看到,kafka也允许在无Follower的情况下继续工作(可用min.insync.replicas限制),并且Kafka也允许配置不严谨的Leader选举(unclean.leader.election.enable),在所有ISR都失效的情况下,选择非ISR副本成为新的Leader,但这会有数据丢失的风险。

什么是HW和LEO

  • HW:High Water Mark,它指的是最后一个被成功commit(复制到所有ISR)的消息的偏移量。
  • LEO:Log End Offset,它指的是最后一条写入消息的偏移量。

显然,LEO通常会大于HW但最终保持一致,随着副本同步数据的进行,Leader上的HW会逐渐递增更新,HW更新时表示HW之前的数据已经被复制到所有的ISR,而Consumer只能消费HW之前的数据,这也是Kafka对于数据一致性保证的基础。

Consumer是否可以从Follower拉取数据

在Kafka 2.4版本之前,数据的写入和读取完全由Leader负责处理,Follower只做数据同步,而在这个版本之后,根据KIP-392,考虑到多数据中心的情况下,跨区流量的费用、延迟都比较高,Kafka允许配置Consumer从最近的replica读取数据。这就有可能出现从非ISR副本读取数据、偏移量不存在等状况,通常Consumer会收到异常然后重试,具体情况请查看提到的KIP详情。

Follower如何更新HW

Follower也需要知道当前的HW是多少,以便在选主、consumer拉取数据等操作时做出正确的处理。Leader的HW更新是异步的,同样,Follower的HW更是异步加异步。Leader依靠Follower的FetchRequest来判断更新HW,而HW的传输是依靠相对应的FetchResponse,在对Follower的同步请求响应中会包含HW的信息。因此Follower的HW一般会略有落后于Leader

发送数据时Leader失效会出现什么情况

这个问题要分类讨论:

  1. 在消息写入Leader的log之前Leader失效。此时消息可能还没送达,或者是送达了但还没写入,那么根据Producer的配置
    1. ack=0:Producer不考虑消息是否接收成功,这种配置下消息丢失
    2. ack=1/all:Producer接收响应超时触发回调,一般会做消息重发,那么消息不会丢
  2. 消息写入了Leader的log,Producer没有收到响应(包括没有返回和返回丢失的情况)
    1. 新的Leader没有写入该消息:Producer接收响应超时重发,保持正常
    2. 新的Leader已经写入了该消息(包括消息已经commit、被同步到所有ISR的情况):Producer接收响应超时重发,如果没有幂等处理则消息重复,否则一切正常
  3. 允许不严谨的Leader选主,所有ISR都失效,从非ISR中选出了Leader。Producer接收响应超时重发,这条消息仍会写入,但是之前的消息可能有丢失风险

Partition Leader失效是否丢消息主要看Producer的重试处理,以及是否做了幂等。

重选Leader的大致过程是怎么样的

这里指的Leader是Partition Leader。这部分的操作逻辑很少有文档直接描述,因此看源码(v2.4)进行分析。

以Zookeeper管理集群下,Leader所在Broker宕机为例,Controller(Broker管理者)监听Zk收到节点变更的通知,触发BrokerFailure(kafka.controller.KafkaController#onBrokerFailure),然后内部级联触发副本下线(kafka.controller.KafkaController#onReplicasBecomeOffline),然后通过PartitionStateMachine进行一系列状态变更操作,触发Leader选举逻辑(kafka.controller.ZkPartitionStateMachine#electLeaderForPartitions),根据策略选出新的Leader后,向zk更新Leader和ISR的状态信息(zkClient.updateLeaderAndIsr),并且向这个Partition replica所在的所有broker发送 LeaderAndIsrRequest(controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers),同步当前的分片状态。总的来说就是Controller处理重新选主,然后更新、同步元数据

Kafka也抽象出了控制平面(Control Plane)的概念,可以是Zk的实现,也可以是KRaft的实现,大致上应该是一致的。

重选Leader过后,会进行数据恢复,新的Leader数据log保持不变,但是HW可能会有回退的情况(因为HW的同步是异步的),剩下的Follower向新的Leader发送数据同步请求(FetchRequest),Leader根据请求的信息和自身数据log的情况返回响应(FetchResponse),Follower会截断自身log中与新Leader数据不一致的部分,最终保持和新Leader的数据一致,随着Follower的同步,新Leader的HW开始逐步增加恢复。

如果一条数据已经commit,那就意味着已经同步到所有的ISR中,所以重新选主并不会导致已经commit的数据发生变动,同时消费者只能消费已经commit的数据,这就是Kafka对数据一致的保证。

Broker端丢数据的情况有哪些

在Kafka中,丢数据不是一个简单的场景、说法,取决于各种各样的配置和环境。

  • 对于单个Broker来说,由于Kafka的数据写入没有提供刷盘的选项,因此数据log是会暂留在PageCache中的,那么断电会导致这部分数据丢失。
  • Kafka可以配置允许在只有一个副本的情况下正常运行,因此如果当前只有Leader在正常处理,没有Follower或者Follower全部被踢出ISR,此时Leader宕机就可能会导致真正的数据丢失。
  • Kafka可以配置允许从非ISR副本中选举出Leader,那么已经commit的数据就有可能发生丢失。

如何增强数据保障

说句实话我是没碰到过丢的情况,而且是在消息量10W/s级别的场景,真的那么会丢也不可能作为一款著名的、成熟的、生产级别的消息中间件。但是纯理论上来说,Kafka确实有可能丢数据,我之前确实是大意了。那么保障数据不丢的一些基本操作包括:

  • 配置replication.factor={val >= 3}:3个副本是一个相对合理的值

  • 配置min.insync.replicas={val > 1}:限制ISR的最小数量,如果ISR小于该值则拒绝数据写入,保证提交的数据至少写入了2个副本

  • 配置 unclean.leader.election.enable=false,禁止从非ISR副本中选主

  • Producer端配置ack=-1/all以及重试,回调函数记录发送失败的情况进行针对性处理,如有必要可以开启幂等发送,排除重复发送的问题

  • 采用At Least Once语义,Consumer端严格按照拉取数据、业务操作、手动提交offset的顺序进行消费,业务失败时进行重试或者记录,做好幂等性处理

References

[1] Kafka documentation:https://kafka.apache.org/documentation.html#replication

[2] Kafka Replication:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication#KafkaReplication-Failurescenarios

[3] KIP-392: Allow consumers to fetch from closet replica:https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica

[4] KIP-497: Add inter-broker API to alter ISR:https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR

[5] Data replication:https://developer.confluent.io/learn-kafka/architecture/data-replication/

[6] Kafka 源码解析之副本状态机与分区状态机(十七):https://matt33.com/2018/06/16/controller-state-machine/#PartitionStateMachine

[7] Kafka 到底会不会丢数据?:https://mp.weixin.qq.com/s?__biz=Mzg3MTcxMDgxNA==&mid=2247490489&idx=1&sn=17817f6d9837ad6a8823362d5ed38687