Kafka 入门学习


一句话摘要

Kafka 是基于发布订阅模式的分布式消息流平台,通过分区、副本、消费者组等机制实现高吞吐、低延迟、高可用的消息传递;本文系统梳理了其核心概念、集群配置参数、Producer/Consumer API 及偏移量管理。


核心知识点

1. 基本术语体系

消息(Message/Record):Kafka 中的基本数据单元,等价于数据库表中的一行记录。

批次(Batch):为提高效率,消息分批写入 Kafka,一个批次代表一组消息。

主题(Topic):消息的分类标签,等价于数据库表。

分区(Partition):Topic 被分为多个 Partition,可分布在不同 Broker 上,实现水平扩展。单个分区内有序,跨分区无序。

偏移量(Consumer Offset):不断递增的整数元数据,记录消费者重平衡时的消费位置,用于恢复消费进度。

Broker:一台 Kafka 服务器。接收生产者消息,为消息设置偏移量,持久化到磁盘。集群中有一个 Broker 担任控制器角色,由活跃成员自动选举产生。

副本(Replica):消息的备份。分为 Leader Replica(对外提供服务)和 Follower Replica(被动同步)两类。

重平衡(Rebalance):消费者组内某成员宕机时,其他消费者自动重新分配分区的过程,是消费者端高可用的核心手段。


2. Kafka 设计特性

  • 高吞吐低延迟:每秒处理数十万条消息,最低延迟仅几毫秒。
  • 高伸缩性:分区可分布在不同 Broker,通过增加节点扩展吞吐量。
  • 持久性与可靠性:消息持久化到磁盘,支持副本备份。
  • 容错性:允许集群中的节点失败,集群仍能正常运行。
  • 高并发:支持数千个客户端同时读写。

3. Kafka 高性能的底层原理

Kafka 速度快的核心原因有四点:

  1. 顺序读写:消息追加写入磁盘,避免随机寻址。
  2. 零拷贝(Zero-Copy):数据在内核态直接从磁盘传输到网络,避免内核与用户空间的数据拷贝切换。
  3. 消息压缩:批量数据压缩,减少 I/O 传输量。
  4. 分批发送:从 Producer 到文件系统再到 Consumer,全链路以批次为单位传输数据。

4. 四大核心 API

  • Producer API:向一个或多个 Topic 发送消息记录。
  • Consumer API:订阅一个或多个 Topic 并处理消息流。
  • Streams API:流处理器,消费输入流并生成输出流(Topic → Topic 转换)。
  • Connector API:连接 Kafka Topic 与外部系统,如关系型数据库的 CDC 连接器。

5. Broker 端重要配置

参数说明默认值/示例
broker.idBroker 唯一标识,集群内不可重复默认 0
port监听端口,1024 以下需 root 权限默认 9092
zookeeper.connectZK 地址,格式 zk1:2181,zk2:2181/kafka1无默认,必填
log.dirs消息日志存储路径,多路径逗号分隔,如 /home/kafka1,/home/kafka2无默认,必填
num.recovery.threads.per.data.dir每个日志目录的恢复线程数,崩溃重启时并行恢复可大幅缩短时间默认 1
auto.create.topics.enable**线上强烈建议设为 **false,否则任何写/读/元数据请求都会自动建 Topic,产生大量垃圾 Topic默认 true

主题级默认配置:

参数说明默认值
num.partitions自动创建 Topic 时的默认分区数,只能增不能减1
default.replication.factor副本数1
log.retention.hours / log.retention.ms消息保留时长,推荐用 ms 精度的版本168h(1 周)
log.retention.bytes按大小保留,作用于每个分区。8 分区 + 1GB = 最多保留 8GB-1(无限)
log.segment.bytes单个日志片段上限,到达后关闭当前片段、打开新片段1GB
message.max.bytesBroker 能接收的单条消息最大值需与消费端参数匹配

JVM 推荐配置:

  • JDK 版本:推荐 JDK 1.8。
  • 堆大小:直接设置为 6GB 以避免常见 Bug。
  • GC:Java 8 直接使用默认 G1,关键参数:
    • MaxGCPauseMillis:每次 GC 停顿目标,默认 200ms
    • InitiatingHeapOccupancyPercent:堆使用率超过该值触发 GC,默认 45%

6. Kafka Producer

创建生产者必填三项:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092"); // 建议≥2个,保证HA
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

三种消息发送方式:

  1. Fire-and-Forget(直接发送,不关心结果):调用 send() 不处理返回值,不知道消息是否成功,适合允许丢失的场景。
  2. 同步发送:调用 send().get() 阻塞等待,服务器出错时抛出异常。生产者内部有两类错误:可重试错误(如连接断开、无主分区)会自动重试;不可重试错误(如消息过大)直接抛出。
  3. 异步发送(带回调):传入 Callback 对象,实现 onCompletion() 接口,错误时 exception 非空:
producer.send(record, (metadata, exception) -> {
    if (exception != null) { /* 处理错误 */ }
});

分区策略:

  • 轮询(Round-Robin):默认策略,均匀分配到各分区,吞吐最优。
  • 随机(Random):旧版本策略,新版已改为轮询,不推荐。
  • Key 路由(key-ordering):相同 Key 的消息始终进入同一分区,保证分区内顺序性:
// 两行代码实现 key-based 分区
int numPartitions = cluster.partitionCountForTopic(topic);
return key.hashCode() % numPartitions;
  • 自定义分区:实现 org.apache.kafka.clients.producer.Partitioner 接口,配置 Partitioner.class 参数。Partitioner 包含三个方法:partition()(核心逻辑)、close()onNewBatch()

压缩机制:

压缩在 Producer 侧开启,Consumer 侧自动解压,压缩类型随消息头传递:

props.put("compression.type", "gzip"); // 可选:snappy、gzip、lz4

Kafka 以消息集合(Message Set)为单位进行压缩,不对单条消息压缩,批量压缩效率更高。

Producer 重要参数:

参数说明
acks0=不等待确认(UDP模式);1=Leader 确认即可;all=所有副本确认,最安全但延迟最高
buffer.memory发送缓冲区大小,超出时 send() 阻塞或抛异常
retries临时错误重试次数,每次重试间隔由 retry.backoff.ms 控制(默认 100ms)
batch.size同一分区消息的批次内存上限,填满即发;不一定等满才发
max.in.flight.requests.per.connection服务器响应前可发送的最大消息数,设为 1 可保证严格顺序写入
compression.type压缩算法,默认不压缩
request.timeout.ms等待服务器响应的超时时间
max.block.ms缓冲区满或无元数据时 send() 的最大阻塞时间,超时抛 TimeoutException
max.request.size单次请求(或单条消息)的最大大小

7. Kafka Consumer

消费者组(Consumer Group)核心机制:

  • 组内消费者共享 Group ID,共同消费一个 Topic。
  • 每个分区只能被组内一个消费者消费,多余消费者闲置。
  • 消费者数量不应超过分区数,超出部分完全空闲。
  • 水平扩展消费能力:向消费者组增加消费者(不超过分区数)。

两种消费模式:

  • 点对点(消息队列):一个消费者组消费一个 Topic,消息被消费后即消失。
  • 发布-订阅:多个消费者组消费同一 Topic,每个组都能收到全量消息。若要应用读取全量消息,需为该应用单独创建消费者组。

重平衡(Rebalance):

  • 触发时机:消费者加入/离开组、消费者宕机(心跳超时)、Topic 分区数变化。
  • 重平衡期间消费者组完全不可用(类比 JVM 的 Stop-The-World)。
  • 消费者通过定期向 Group Coordinator(Kafka Broker)发送心跳维持成员资格。
  • session.timeout.ms(默认 3s)内未发心跳即被判定死亡,触发重平衡。
  • 重平衡是 Kafka 消费端的已知 Bug 级缺陷,社区至今无法完全修复其性能问题。

创建消费者示例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092");
props.put("group.id", "myGroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题(支持正则,新建匹配Topic会立即触发重平衡)
consumer.subscribe(Collections.singletonList("customerTopic"));
// 订阅所有test相关主题:consumer.subscribe(Pattern.compile("test.*"));

// 轮询拉取
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息...
    }
}
// 关闭时立即触发重平衡,而非等待超时
consumer.close();

注意: 同一消费者组内,一个线程只能运行一个消费者;多消费者需多线程,可用 ExecutorService 管理。

Consumer 重要参数:

参数说明默认值
fetch.min.bytes每次 fetch 的最小数据量,未满则等待,降低低频场景的空轮询开销1 byte
fetch.max.wait.ms等待 fetch.min.bytes 的最长时间,与 fetch.min.bytes 共同决定延迟500ms
max.partition.fetch.bytes每个分区每次 poll 返回的最大字节数,必须大于 max.message.size1MB
session.timeout.ms心跳超时时间,超时触发重平衡3s
heartbeat.interval.ms心跳发送频率,必须小于 session.timeout.ms,通常为其 1/3
auto.offset.reset偏移量无效时的行为:latest(从最新)或 earliest(从头)latest
enable.auto.commit是否自动提交偏移量,生产环境建议设为 false 手动控制true
auto.commit.interval.ms自动提交间隔5s
partition.assignment.strategy分区分配策略:RangeRoundRobin
max.poll.records单次 poll() 返回的最大记录数

8. 偏移量提交

偏移量提交到 Kafka 内部特殊主题 _consumer_offset,用于重平衡后恢复消费位置。

偏移量不一致的两种后果:

  • 提交偏移量 < 最后处理偏移量 → 两者之间的消息被重复消费
  • 提交偏移量 > 最后处理偏移量 → 两者之间的消息丢失

三种提交方式:

  1. 自动提交enable.auto.commit=true,每隔 auto.commit.interval.ms(默认 5s)自动提交 poll 到的最大偏移量。简单但可能重复消费。
  2. **手动同步提交 **commitSync():处理完所有消息后调用,提交失败会抛出异常并一直重试。
consumer.commitSync(); // 提交当前 poll 返回的最新偏移量
  1. **手动异步提交 **commitAsync():不阻塞,不重试(即使失败)。
consumer.commitAsync((offsets, exception) -> { /* 处理失败 */ });

最佳实践:正常消费用 commitAsync(),在消费者关闭前(finally 块)用 commitSync() 确保最后一次提交成功:

try {
    while (true) { ...; consumer.commitAsync(); }
} finally {
    consumer.commitSync(); // 保证最后一次提交
    consumer.close();
}

commitSync()commitAsync() 均支持传入 Map<TopicPartition, OffsetAndMetadata> 提交特定分区的特定偏移量,实现更细粒度控制。


优缺点与局限性

优点与适用场景:

  • 大规模日志采集、用户行为追踪、监控指标聚合:高吞吐 + 持久化天然契合。
  • 流计算管道(配合 Flink/Spark Streaming):Streams API 提供原生流处理能力。
  • 解耦多个系统:发布订阅模式下多个消费者组可独立消费全量数据,互不干扰。
  • 削峰填谷:缓冲突发流量,消费者按自身节奏消费。

局限与踩坑点:

  • 重平衡性能问题:Rebalance 会导致消费者组完全停止消费(STW),且过程很慢,是目前 Kafka 未彻底解决的缺陷。频繁触发(如 JVM GC 停顿超过 session.timeout.ms)会严重影响消费延迟。
  • 消费者数不能超过分区数:超出的消费者永久闲置,是资源浪费。扩容时需提前规划分区数。
  • 分区数只能增不能减:创建 Topic 时 num.partitions 的选择需慎重,无法减少。
  • auto.create.topics.enable=true** 的线上陷阱**:任何客户端的读/写/元数据操作都会自动建 Topic,生产环境必须关闭。
  • acks** 配置影响可靠性**:acks=0 完全不保证送达;acks=1 在 Leader 宕机时仍可能丢消息;只有 acks=all 才能保证强一致,但延迟最高。
  • 自动提交偏移量的重复消费风险:5s 的自动提交间隔内若消费者崩溃,重启后会从上次提交的偏移量重新消费,产生重复数据。
  • max.partition.fetch.bytes** 需大于 **max.message.size:否则超大消息将永远无法被消费,陷入死循环。
  • log.retention.bytes** 按分区计算**:8 个分区 × 1GB = 整个 Topic 实际可保留 8GB,不是 1GB。

行动清单

  1. 搭建本地单节点 Kafka:配置 broker.idlog.dirszookeeper.connect,练习 Topic 创建、num.partitionsdefault.replication.factor 设置。
  2. 动手实现三种 Producer 发送方式:分别用 Fire-and-Forget、send().get() 同步、Callback 异步各发送 100 条消息,观察延迟差异和异常处理行为。
  3. 实验分区策略:分别实现轮询、随机、Key 路由三种策略,创建 4 个分区的 Topic,观察消息分布是否符合预期。
  4. 验证 Consumer Group 行为:创建 4 个分区的 Topic,依次启动 1、2、4、5 个消费者,观察分区分配和第 5 个消费者的闲置状态;再 Kill 一个消费者,观察重平衡触发。
  5. 实践偏移量管理:先用自动提交,模拟消费者崩溃后重复消费的场景;再改为手动异步+关闭前同步的组合方式,对比两者差异。
  6. 调优 acks 参数:分别将 acks 设为 01all,配合 Kafka 自带的 kafka-producer-perf-test.sh 工具测量吞吐量与延迟变化,感受可靠性与性能的权衡。
  7. 深入阅读:《Kafka 权威指南》(O’Reilly)+ 极客时间《Kafka 核心技术与实战》,重点补充 Exactly-Once 语义、ISR 机制、Controller 选举等进阶内容。

Kafka 基础入门

一句话摘要

Kafka 是面向大数据实时处理的分布式消息队列,核心价值是解耦、削峰、异步,架构围绕 Topic-Partition-Replica 三层模型展开。


核心知识点

一、为什么用消息队列

价值解决的问题
解耦生产者与消费者独立修改,互不影响
冗余消息持久化,处理失败不丢数据
削峰突发流量缓冲,避免系统崩溃
异步消息入队即返回,后续异步消费

二、核心概念速查

Producer: 消息生产者,向 Broker 发消息的客户端。

Consumer: 消息消费者,从 Broker 读消息的客户端。

Consumer Group: 消费者组,组内每个消费者消费不同 Partition,一个 Partition 只能被组内一个消费者消费,不同组之间互不影响。

Broker: 一台 Kafka 机器 = 一个 Broker,多个 Broker 组成集群,一个 Broker 可容纳多个 Topic。

Topic: 逻辑概念,消息分类容器,生产者和消费者面向同一个 Topic。

Partition: 物理概念,Topic 的分片单元,每个 Partition 是一个有序队列,对应一个 log 文件。一个 Topic 可分布在多个 Broker 上的多个 Partition 中。

Replica: 副本,每个 Partition 有一个 Leader 副本 + 若干 Follower 副本,Follower 不能与 Leader 在同一个 Broker 上。

Leader: 生产者写入和消费者读取的唯一对象。

Follower: 实时同步 Leader 数据,Leader 故障时选举为新 Leader。

Offset: 消费者消费位置的游标,崩溃恢复后从此位置继续消费。

ZooKeeper: 存储和管理集群元数据,新版本正在逐步去除依赖。


三、工作流程

Producer

   │ 发送消息(追加到 log 文件末尾)

Leader Partition(物理 log 文件)
   │  每条消息记录自身 Offset

   ├──→ Follower 主动同步数据


Consumer Group
   └─ 每个 Consumer 实时记录当前消费的 Offset
  • 消息以 Key + Value + 时间戳 三元组存储
  • Producer 只往 Leader Partition 写,Consumer 只从 Leader Partition
  • 数据顺序追加到 log 文件末尾,写入性能极高

四、存储机制(分片 + 索引)

问题: log 文件无限增大 → 数据定位效率低。

解决: 每个 Partition 切分为多个 Segment,每个 Segment 对应 4 个文件:

文件作用
.index索引文件,存储大量索引信息
.log数据文件,存储实际消息
.snapshot快照文件
.timeindex时间索引文件

文件命名规则: 以当前 Segment 第一条消息的 Offset 命名。

目录命名规则: topic名称-分区号

示例:heartbeat topic,3个分区对应:
  heartbeat-0/
  heartbeat-1/
  heartbeat-2/

.index 中的元数据 → 指向 .log 中对应消息的物理偏移量,实现快速定位。


五、副本机制

  • 每个 Partition 建议设置 2 个副本(设置 3 个也常见)
  • Producer 只写 Leader,Follower 主动拉取同步
  • Consumer 只读 Leader
  • Leader 故障 → Follower 选举新 Leader
Partition 0:  Leader(Broker1)  Follower(Broker2)  Follower(Broker3)
Partition 1:  Leader(Broker2)  Follower(Broker1)  Follower(Broker3)
Partition 2:  Leader(Broker3)  Follower(Broker1)  Follower(Broker2)

六、Controller

  • 集群中一台特殊的 Broker,承担额外的集群管理工作
  • 选举方式:公平竞选,最先在 ZooKeeper 创建临时节点 /controller 的 Broker 成为 Controller
  • 通常是第一台启动的 Broker,将自身信息写入 /controller 节点

七、Offset 维护

版本Offset 存储位置原因
0.9 之前ZooKeeper旧方案
0.9 之后Kafka 内置 Topic __consumer_offsets支持高并发读写

Consumer 崩溃恢复后,从 __consumer_offsets 读取上次 Offset,继续消费。


优缺点与局限性

特性优点限制 / 踩坑点
Partition 分区提高并发,支持横向扩展同一 Partition 内有序,跨 Partition 不保证全局顺序
顺序写 log写性能极高,O(1) 追加log 文件需配合 Segment 分片管理,否则定位慢
副本机制节点故障数据不丢Follower 同步有延迟,Leader 宕机瞬间可能丢少量数据
Consumer Group多消费者并行,提升消费能力一个 Partition 只能被组内一个 Consumer 消费,Consumer 数 > Partition 数时有 Consumer 空闲
ZooKeeper 依赖元数据管理成熟ZooKeeper 是额外运维负担,新版本正在去除
Offset 自管理灵活,支持重放消费需要正确提交 Offset,否则重复消费或丢消息

行动清单

  1. 本地搭建单节点 Kafka:用 Docker 启动 Kafka + ZooKeeper,创建 Topic,用命令行生产和消费消息验证基本流程
  2. 观察 Partition 文件结构:创建 Topic 后进入数据目录,查看 .index.log.timeindex 文件,理解 Segment 分片命名规则
  3. 模拟 Consumer Group:启动多个 Consumer 加入同一 Group,观察 Partition 分配和负载均衡行为
  4. 验证 Offset 机制:Consumer 消费一半后 kill 掉,重启后观察是否从上次 Offset 继续消费
  5. **查看 **__consumer_offsets:用 kafka-console-consumer 订阅该内置 Topic,观察 Offset 提交记录
  6. 下一篇预习:带着问题阅读 Kafka 三高(高性能、高可用、高并发)的设计,重点关注零拷贝、ISR 机制、分区再均衡

Kafka 三高架构设计

一句话摘要

Kafka 的高可用、高性能、高并发分别依赖 ISR + ACK 机制零拷贝 + 顺序写 + 内存池三层 Reactor 网络架构实现,三者协同支撑起工业级消息队列的稳定性。


核心知识点

一、高可用设计

Controller 选举

三类选举:控制器选举、Leader 选举、消费者选举。

Controller 启动流程:

步骤1:第一个启动的 Broker 在 ZooKeeper 创建临时节点 /controller,写入自身信息
步骤2:其他 Broker 尝试创建 /controller,因节点已存在抛出异常,放弃竞争
步骤3:其他 Broker 在 Controller 上注册监听器,监听各自节点状态变化

Controller 的职责:普通 Broker 功能 + 管理 Topic 分区和副本状态 + 执行 Partition 重分配。


副本机制

  • 每个 Partition 有多个副本,通过 replication-factor 参数指定副本数
  • 所有消息只写 Leader 副本,Follower 主动复制 Leader 数据
  • Leader 不可用时,从 Follower 中选举新 Leader
  • Broker 通过 broker.id 唯一标识自身

ISR 机制

ISR(In-Sync Replica): 维护所有同步可用副本的列表,Leader 副本必然在 ISR 中。

Follower 留在 ISR 的条件:

  1. 定时向 ZooKeeper 发送心跳
  2. 在规定时间内从 Leader 低延迟地获取过消息

关键参数:

replica.lag.time.max.ms = 10000(默认10秒)
含义:Follower 落后 Leader 不超过10秒,视为同步副本

ISR 存储位置:

ZooKeeper 节点:/brokers/topics/[topic]/partitions/[partition]/state
维护方:Controller(选举新 Leader 时更新)+ Leader(定期检测 Follower 状态)

ACK 机制

acks 参数在 KafkaProducer 端配置,三个可选值:

acks 值确认时机数据安全性性能适用场景
0不等待 Broker 响应,写入 Socket Buffer 即完成极低,可能丢失最高日志采集,允许丢数据
1Leader 写入本地日志后确认,不等 Follower中,Leader 宕机可能丢中(默认配置)平衡场景
all等待所有 ISR 副本确认后才返回最高,不丢数据最低金融、订单等高可靠场景

acks=all 配套参数:

min.insync.replicas = 2(默认1)
含义:ISR 中副本数 < 该值时,Leader 停止写入,向 Producer 抛出 NotEnoughReplicas 异常
效果:能容忍 min.insync.replicas - 1 个副本同时宕机

二、高性能设计

Reactor 多路复用模型

Kafka SocketServer 基于 Java NIO 实现 Reactor 模式,三种角色:

Acceptor(1个)
    │ 监听客户端连接请求(OP_ACCEPT)

Processor(N个)
    │ 负责读写数据,每个 Connection 对应一个 Processor
    │ 每个 Processor 拥有独立的 Selector

Handler(M个)
    │ 处理业务逻辑
    
各角色之间用队列缓冲请求

Java NIO 三核心组件:Channel(数据传输)、Buffer(数据缓冲)、Selector(单线程处理多 Channel)。


生产消息流程

消息 → 封装为 ProducerRecord
    → 序列化(Serializer)
    → Partitioner 分区器确定目标分区(从 Broker 获取元数据)
    → 写入 RecordAccumulator 缓存队列(批次 RecordBatch)
    → Sender 线程将多个 Batch 封装为 Request 发送
    → 通过 Selector 对应的 KafkaChannel 发往 Broker

关键参数:

batch.size = 16KB(写满立即发送)
linger.ms = 500ms(未写满超时也发送)

顺序写磁盘 + OS Cache

  • 写入时先写 os cache(Page Cache),由操作系统决定刷盘时机,大幅提升写效率
  • 数据顺序追加到文件末尾,不做随机写
  • 机械磁盘顺序写性能 ≈ 内存写入性能(避免磁盘寻址开销)

零拷贝(Zero-Copy)

传统消费数据路径(4次拷贝,多次上下文切换):

磁盘 → os cache → 应用程序缓存 → Socket 缓存 → 网卡 → 消费者
          ↑            ↑
      这两次拷贝是多余的,且伴随内核态/用户态切换

零拷贝路径:

磁盘 → os cache → 网卡 → 消费者
       (Socket 缓存只拷贝描述符,不拷贝数据)

两种零拷贝实现:

方式原理Kafka 对应实现
mmap内存映射文件,减少用户态拷贝MappedByteBuffer
sendfile数据直接从内核发到网卡FileChannel.transferTo()

Java NIO 零拷贝示例:

// transferTo() 内部调用操作系统 sendfile() 系统调用
fileChannel.transferTo(position, count, socketChannel);

压缩传输

  • 默认不开启压缩,开启后提升吞吐量、降低延迟、节省磁盘
  • 压缩发生在 Producer 端,Broker 端保持压缩状态存储,Consumer 端解压
  • 支持算法:lz4snappygzipZStandard(Kafka 2.1.0+ 新增,Facebook 开源,压缩比最高)
  • Producer、Broker、Consumer 必须使用相同压缩算法

服务端内存池设计

目的: 避免频繁内存申请与回收导致 Full GC,稳定生产者性能。

Buffer Pool 总大小:32MB
├── 内存队列:存放固定大小的内存块(每块 16KB)
└── 可用内存:动态分配区

数据结构 Batches:
  Key   → 消息 Topic 的分区
  Value → 对应分区的批次队列

流程:
消息到达 → 申请内存块(16KB)→ 封装 Batch → Sender 发送
→ 发送完毕 → 清空内存块 → 归还内存池 → 循环复用

三、高并发设计

三层网络架构

第一层:Acceptor 线程(1个)
    监听 OP_ACCEPT 事件,接收连接
    封装为 SocketChannel,轮询分发给 Processor

第二层:Processor 线程(默认3个,num.network.threads=3)
    连接队列存放 SocketChannel
    注册 OP_READ,解析二进制请求为 Request 对象
    将 Request 放入 RequestQueue
    监听 ResponseQueue,注册 OP_WRITE 返回响应

第三层:RequestHandler 线程池(默认8个,num.io.threads=8)
    从 RequestQueue 取出请求
    解析数据写入磁盘
    封装 Response 放入对应 Processor 的 ResponseQueue

容量估算与扩容:

默认:每个 RequestHandler 处理 2000 QPS
默认总量:2000 × 8 = 1.6万 QPS

扩容配置:
num.network.threads = 9
num.io.threads = 32(建议与 CPU 核数一致)
扩容后:2000 × 32 = 6.4万 QPS

优缺点与局限性汇总

技术适用场景限制踩坑点
acks=0日志采集、允许丢数据无法保证送达Retries 参数失效,Offset 返回 -1
acks=1通用场景(默认)Leader 宕机可能丢消息类似 MySQL 主从异步,存在数据差异
acks=all金融、订单性能最低需配合 min.insync.replicas≥2,否则单副本等于降级
ISR 机制副本同步管理Follower 可能短暂落后replica.lag.time.max.ms 设置过短导致频繁移出 ISR
零拷贝消费侧大吞吐依赖操作系统支持数据需要修改时零拷贝失效,需结合 copy-on-write
内存池高频生产消息总大小固定 32MB消息积压时内存池耗尽,生产者阻塞
顺序写写入性能优化日志文件持续增大需配合 Segment 分片 + 定期清理策略

行动清单

  1. ACK 参数实验:分别用 acks=0/1/all 发送消息,kill Leader 节点,对比三种配置下的数据丢失情况
  2. ISR 动态观察:用 kafka-topics.sh --describe 实时查看 ISR 列表,手动暂停 Follower 观察其被移出 ISR 的过程
  3. 零拷贝性能对比:用 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 压测,对比开启/关闭零拷贝时的吞吐量差异
  4. 压缩算法对比:分别配置 snappylz4zstd,压测吞吐量、CPU 占用和磁盘占用,选出最适合业务的算法
  5. 网络线程调优:在压测环境中调整 num.network.threadsnum.io.threads,观察 QPS 变化,验证扩容公式
  6. 阅读源码:重点阅读 SocketServer.scala(Acceptor/Processor 实现)和 RecordAccumulator.java(内存池实现)

Kafka Producer 底层原理

一句话摘要

Kafka Producer 通过分区器 + 序列化 + 内存池 + Sender 线程 + NIO 网络层的流水线设计实现高吞吐低延迟,核心调优围绕批次大小、内存、压缩、重试四个维度展开。


核心知识点

一、Producer 初始化流程

13 个步骤按序执行:

producer = new KafkaProducer<>(props);

// 1. 分区器(支持自定义)
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

// 2. 重试时间间隔,默认 100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

// 3. 序列化器
// ...

// 4. 拦截器
this.interceptors = new ProducerInterceptors<>(interceptorList);

// 5. 元数据,metadata.max.age.ms 默认 5 分钟
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), ...);

// 6. 单条消息最大大小,默认 1M,生产建议改为 10M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);

// 7. 缓冲区大小,默认 32M
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);

// 8. 压缩格式
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

// 9. RecordAccumulator 缓冲区,32M
this.accumulator = new RecordAccumulator(
    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),  // 默认 16KB
    this.totalMemorySize,
    this.compressionType,
    config.getLong(ProducerConfig.LINGER_MS_CONFIG),  // 默认 0ms
    ...);

// 10. 初始化元数据(此时未真正拉取)
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

// 11. NetworkClient 关键参数:
//   connections.max.idle.ms = 9分钟(超时关闭空闲连接)
//   max.in.flight.requests.per.connection = 5(保证有序需设为1)
//   send.buffer.bytes = 128KB
//   receive.buffer.bytes = 32KB
NetworkClient client = new NetworkClient(...);

// 12. Sender 线程(acks: 0/1/-1,retries 重试次数)
this.sender = new Sender(client, this.metadata, this.accumulator, ...);

// 13. 启动 Sender 守护线程
this.ioThread.start();

二、元数据拉取流程

主线程调用 send()
    → 触发元数据拉取标识
    → 主线程 wait() 等待
    → 唤醒 Sender(KafkaThread)线程
    → NetworkClient 从 Broker 集群拉取元数据
    → Broker 返回元数据响应
    → 更新 version 版本号到 Metadata 组件
    → 唤醒主线程继续执行

三、Producer 发送流程

9 个阶段:

1. 初始化(加载配置,启动网络线程)
2. 拦截器预处理消息,封装 ProducerRecord
3. Serializer 序列化 key/value
4. Partitioner 分配目标分区号
5. 从 Broker 获取集群元数据 metadata
6. 消息写入 RecordAccumulator
   └─ 按目标分区找到对应 Deque<RecordBatch>
   └─ 没有则新建 RecordBatch 加入队列
7. 达到发送阈值 → 唤醒 Sender 线程
   └─ RecordBatch → Request
   └─ 按【Broker Id → List<Request>】归类
8. 与各 Broker 建立网络连接,发送对应消息列表
9. 触发发送的条件(满足其一即发):
   └─ 缓冲区数据达到 batch.size(默认16KB)
   └─ 等待时间达到 linger.ms(默认0ms)

四、内存池设计

目的: 复用内存块,避免频繁 GC,防止 Full GC 影响生产者性能。

结构:

Buffer Pool 总大小:32MB
├── 已分配内存(紫色区域):固定 16KB 的内存块队列,可被反复取用
└── 可用内存(粉色区域):未分配的剩余内存,初始 32MB

申请内存逻辑:

// 加锁保证线程安全
this.lock.lock();

// 情况1:申请大小超过整个缓存池 → 抛异常
// 情况2:申请的恰好是 16KB 且已分配队列非空 → 直接取出复用
if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

// 情况3:可用内存足够 → 从可用内存划出一块
// this.availableMemory + freeListSize >= size

释放内存逻辑:

释放大小 == 16KB → 归还到已分配内存队列(供下次复用)
释放大小 != 16KB → 归还到可用内存区域(等待 JVM GC 回收)

为何只有 16KB 才能进已分配队列:
Batch 发送触发条件是写满(16KB)或超时,若允许不同大小的 Batch(如 1MB),未写满就超时发送会导致内存利用率极低。固定 16KB 保证每个 Batch 大小一致,充分利用内存。


五、网络 IO 架构(三层)

第一层:Acceptor 线程(1个)
    监听 SelectionKey.OP_ACCEPT
    接收 TCP 连接 → 封装 SocketChannel → 轮询分发给 Processor

第二层:Processor 线程(num.network.threads 个,默认3)
    每个 Processor 维护三个队列:
    ├── newConnections:新连接信息
    ├── responseQueue:待返回给客户端的 Response(每个 Processor 独享)
    └── inflightResponses:临时队列,存放已发出但回调未执行的 Response
    职责:解析二进制请求 → 放入共享 RequestQueue
         从 responseQueue 取 Response → 注册 OP_WRITE → 返回客户端

第三层:RequestHandler 线程池(num.io.threads 个,默认8)
    从共享 RequestQueue 取请求 → 处理业务逻辑 → 写磁盘
    → 封装 Response → 放入对应 Processor 的 responseQueue

⚠️ RequestQueue 是共享的;ResponseQueue 是每个 Processor 独享的。


六、磁盘存储架构

LogManager(日志管理器)
    └─ Log 对象(每个 Replica 对应一个)
           │ 管理该分区所有 Segment 文件
           └─ LogSegment(日志分段)
                  ├── .log(数据文件,顺序追加)
                  ├── .index(offset 索引,绝对偏移量 + 相对偏移量)
                  └── .timeindex(时间索引)
  • Segment 大小到达阈值 → maybeRoll() 滚动新建 Segment
  • index 条目非每条消息创建,每隔 index.interval.bytes(默认 4096,即 4KB)创建一条
  • LogSegment 底层通过 mmap + FileChannel 与 OS Cache 交互,基于顺序写落盘

七、Producer 核心参数调优速查

参数默认值生产建议说明
acks1按场景选 0/1/all消息持久性保证
max.request.size1MB10MB超出则发送失败
retries03解决网络抖动、Leader 重选举
retry.backoff.ms100ms总重试时间 > 异常恢复时间两次重试间隔
connections.max.idle.ms540000(9分钟)默认够用空闲连接超时关闭
compression.typenonelz4(吞吐优先)/ zstd(压缩比优先)牺牲 CPU 换 IO
buffer.memory32MB适当调大Producer 内存池总大小
batch.size16KB32KB调大提吞吐,延时略增
linger.ms0ms100ms0 表示立即发,调大减少请求次数
request.timeout.ms30000(30s)高负载调到 60s等待 Broker 响应超时
max.in.flight.requests.per.connection51(防乱序)保证分区内消息有序

优缺点与局限性

机制优点限制 / 踩坑点
内存池(16KB固定块)复用内存,大幅降低 GC 频率消息大小远超 16KB 时,内存利用率下降
批次发送(Batch)减少网络请求,提升吞吐linger.ms=0 时批次效果等于无,吞吐量低
重试机制应对网络抖动、Leader 切换max.in.flight=5 + 重试 → 可能消息乱序
压缩显著节省网络和磁盘 IO增加 Producer 端 CPU 消耗
acks=all最强数据安全保证性能最低,需配合 min.insync.replicas≥2
acks=0最高吞吐Offset 返回 -1,消息丢失无感知,Retries 失效

行动清单

  1. 初始化参数实验:修改 max.request.sizebatch.sizelinger.ms,用 kafka-producer-perf-test.sh 对比吞吐量变化
  2. 复现消息乱序:将 max.in.flight.requests.per.connection 设为 5 并开启 retries,模拟网络抖动,观察分区内消息顺序是否被打乱
  3. 内存池源码阅读:阅读 BufferPool.java 中的 allocate()deallocate() 方法,对照本文 16KB 固定块逻辑理解设计意图
  4. 压缩算法压测:分别配置 lz4snappyzstd,压测 CPU 占用 + 磁盘占用 + 吞吐量,选出最适合业务的算法
  5. 元数据拉取追踪:在 Producer 初始化后打断点,观察 metadata.update() 的触发时机和 version 版本号变化
  6. 生产参数模板:基于调优表整理一份生产环境 Producer 配置模板,区分”高吞吐”和”高可靠”两种场景

Kafka Broker 内部机制

一句话摘要

Kafka Broker 通过 Controller 协调 + HW/LEO 副本同步 + LeaderEpoch 数据一致性 + 时间轮延迟任务四大机制,实现集群的高可用与高性能,其中 LeaderEpoch 是解决 HW 机制数据丢失的关键补丁。


核心知识点

一、Controller(控制器)机制

定义: 集群中唯一一台承担协调职责的 Broker,任意时刻有且只有一个。依赖 ZooKeeper 选举产生。

Controller 的五大职责

1. Topic 管理
负责 Topic 的创建、删除及分区增加,大部分后台工作由 Controller 完成。

2. 分区重分配(Reassignment)
新 Broker 加入集群时,不会自动承接已有 Topic 的负载,只对后续新增 Topic 生效。要让新 Broker 服务已有 Topic,必须手动触发分区重分配,将部分分区迁移到新 Broker 上。也用于解决多 Broker 之间存储负载不均衡问题。

3. Leader 选举

触发分区 Leader 选举的四种场景:

场景触发条件
Offline新建分区 或 分区失去现有 Leader
Reassign用户执行重分配操作
PreferredReplica将 Leader 迁回首选副本
ControlledShutdown现有 Leader 即将下线

选举后,Controller 向所有相关 Broker 发送更新请求(含最新 Leader/Follower 分配信息)。新 Leader 开始处理生产者和消费者请求,Follower 从新 Leader 复制消息。

4. 集群成员管理

依赖 ZooKeeper Watch 机制 + 临时节点组合实现:

  • 新 Broker 启动 → 在 /brokers/ids 下创建临时 znode
  • ZooKeeper 通过 Watch 推送通知给 Controller
  • Controller 自动感知 Broker 上下线

5. 提供数据服务
Controller 保存最全的集群元数据,其他 Broker 定期接收 Controller 发来的元数据更新请求并刷新本地缓存。


Controller 存储的数据分类

分类数据内容
Broker 相关存活 Broker 列表、正在关闭的 Broker 列表、每个 Broker 上的分区和副本
Topic 相关Topic 列表、每个 Topic 的所有分区和副本、每个分区的 Leader 和 ISR
运维任务正在进行 Preferred Leader 选举的分区、正在重分配的分区列表

Controller 故障转移(Failover)

Broker 0(Controller)宕机

ZooKeeper Watch 机制感知 → 删除 /controller 临时节点

所有存活 Broker 竞选新 Controller

Broker 3 抢先创建 /controller 节点,成为新 Controller

Broker 3 从 ZooKeeper 读取集群元数据并初始化缓存

Failover 完成,恢复正常工作

整个过程自动完成,无需人工干预


二、HW 和 LEO 机制

三个关键偏移量

概念全称含义
base offset起始位移副本中第一条消息的 offset
HWHigh Watermark(高水位)副本最新一条已提交消息的 offset
LEOLog End Offset(日志末端位移)副本下一条待写入消息的 offset

HW 的两大作用

  1. 标识可消费范围:HW 之前的消息对消费者可见(committed),HW 之后的消息不可见
  2. 协助副本数据同步:HW 推进依赖所有 Follower 的 LEO 更新

HW 更新规则

Leader 维护所有 Follower 的 LEO 值

HW = min(所有 Follower 的 LEO)
    (取最小值,保证任意 Follower 成为新 Leader 时数据完整)

Follower 自身 HW 的更新:
Follower HW = min(自身 LEO, Leader HW)

Broker 上的数据分布

  • Broker 0(Leader 所在):保存 Leader 副本的 HW + LEO,以及所有 Follower 副本的 LEO(但不保存 Follower 的 HW)
  • Broker 1(Follower 所在):只保存该 Follower 副本的 HW + LEO

Follower 同步数据时携带自身 LEO 上报给 Leader,Leader 据此更新各 Follower 的 LEO 记录并计算 HW。


三、Leader Epoch 机制

背景: HW 更新存在时间错配——Leader HW 更新后,Follower HW 更新需要额外一轮拉取才能完成。这个窗口期内连续宕机会导致数据丢失。Kafka 0.11 版本引入 Leader Epoch 解决此问题。

Leader Epoch 的组成

Leader Epoch = (Epoch, Start Offset)

Epoch:单调递增版本号,每次 Leader 变更时 +1
Start Offset:该 Epoch 对应的 Leader 写入的第一条消息的 offset

HW 机制导致数据丢失的场景

前提:min.insync.replicas = 1

初始状态:
  Leader A:消息 [0,1,2,3],HW=4
  Follower B:消息 [0,1,2,3],HW=3(尚未更新到4)

步骤1:Broker B 宕机重启
  → B 执行日志截断,LEO 截断至 HW=3
  → B 磁盘只剩消息 [0,1,2],位移3的消息被删除

步骤2:B 开始从 A 同步数据时,A 宕机
  → B 被选为新 Leader

步骤3:A 重启后作为 Follower
  → A 执行日志截断,HW 对齐 B 的 HW=3
  → 位移3的消息从 A 和 B 中永久消失 ✗

Leader Epoch 规避数据丢失的原理

Follower 重启后不再盲目截断到 HW,而是向 Leader 请求对应 Epoch 的 Start Offset,以此判断截断位置,避免误删已提交消息。每次 Leader 变更时 Epoch 递增,旧 Leader 恢复后自动识别为过期 Leader,不能行使 Leader 权力。


四、时间轮(TimingWheel)机制

背景: Kafka 中存在大量延迟操作(延迟生产、延迟拉取、延迟删除等)。JDK 的 Timer 和 DelayQueue 插入/删除时间复杂度为 O(n log n),无法满足高性能要求。时间轮将插入和删除复杂度降为 O(1)

数据结构

时间轮(环形数组)
    └─ 每个时间格 → TimerTaskList(环形双向链表)
                        └─ TimerTaskEntry(封装 TimerTask)

关键参数:
  tickMs:每格时间跨度(基本时间单位)
  wheelSize:总格数
  interval:总时间跨度 = tickMs × wheelSize
  currentTime:表盘指针(当前处理时间,为 tickMs 的整数倍)

单层时间轮示例

tickMs = 1ms,wheelSize = 20,interval = 20ms

任务1:定时 2ms → 放入时间格 2
任务2:定时 8ms → 放入时间格 10(currentTime=2时,8ms后到期,2+8=10)
任务3:定时 19ms → 复用已到期的时间格 1(环形复用)

currentTime 推进时,处理对应时间格的 TimerTaskList 中的所有任务

层级时间轮(解决超大延迟)

当任务到期时间超过当前时间轮总跨度时,加入上层时间轮:

第一层:tickMs=1ms,wheelSize=20,interval=20ms
第二层:tickMs=20ms,wheelSize=20,interval=400ms
第三层:tickMs=400ms,wheelSize=20,interval=8000ms

例:350ms 的任务 → 超过第一层(20ms)和第二层(400ms)?
  → 350ms < 400ms,放入第二层
  → 随着时间推进,任务从上层降级到下层,最终精确执行

优缺点与局限性

机制适用场景限制踩坑点
Controller 选举集群启动与故障恢复同一时刻只有一个 Controller,存在单点风险依赖 ZooKeeper,ZK 异常会影响 Controller 选举
HW 机制副本同步 + 消费可见性HW 更新存在时间错配min.insync.replicas=1 时连续宕机可能丢数据
Leader Epoch防止数据丢失/不一致0.11 版本之前不支持版本升级时需注意兼容性
分区重分配Broker 扩容后负载均衡新 Broker 不自动接管旧 Topic必须手动触发,重分配期间有性能损耗
单层时间轮小范围延迟任务超过 interval 的任务无法处理需配合层级时间轮使用
层级时间轮任意延迟范围的任务调度实现复杂度高任务从上层降级到下层有轻微调度开销

行动清单

  1. 模拟 Controller 切换:Docker 启动 3 节点 Kafka 集群,kill Controller 所在节点,观察 ZooKeeper /controller 节点变化和新 Controller 选举日志
  2. 观察 HW/LEO 变化:用 kafka-log-dirs.sh 实时查看分区的 HW 和 LEO 值,暂停 Follower 后观察 HW 停止推进的现象
  3. 复现数据丢失场景:设置 min.insync.replicas=1,按 HW 数据丢失步骤操作,验证位移 3 的消息是否消失,再开启 LeaderEpoch 对比
  4. 验证分区重分配:新增 Broker 节点后,用 kafka-reassign-partitions.sh 手动触发重分配,观察分区迁移过程
  5. 时间轮源码阅读:阅读 Kafka TimingWheel.scalaTimerTaskList.scala,对照本文结构理解环形数组 + 双向链表的实现
  6. 层级时间轮实践:手写一个简化版两层时间轮(Java/Go),支持插入任意延迟任务并按时触发,验证 O(1) 复杂度

Kafka Consumer 底层原理

一句话摘要

Kafka Consumer 采用 Pull 模式消费,通过 Consumer Group + 分区分配策略 + Rebalance + 位移提交 四大机制保障高并发消费和故障恢复,生产环境推荐 StickyAssignor + 混合提交模式。


核心知识点

一、消费方式:Pull vs Push

Kafka Consumer 采用 Pull 模式。

模式缺点Kafka 的处理
PushBroker 不清楚 Consumer 消费速度,推送速率不可控,易造成消息堆积甚至系统崩溃不采用
PullBroker 无消息时 Consumer 空轮询,浪费资源poll() 携带 timeout 参数,无消息时 Long Polling 阻塞等待,直到数据到达再返回

二、Consumer 初始化流程

四步完成初始化:

// 1. 构造配置对象
Properties props = new Properties();
props.put("bootstrap.servers", "...");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// 2. 创建 KafkaConsumer 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅 Topic 列表
consumer.subscribe(Arrays.asList("topic-name"));

// 4. 循环调用 poll() 拉取消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // 处理消息...
}

三、Consumer Group 机制

设计原因: 单个 Consumer 无法消费百万级数据,Consumer Group 提供可扩展且具容错性的并行消费能力。

核心特点:

  • 每个 Group 有一个或多个 Consumer
  • 每个 Group 有唯一的 Group ID
  • 同一 Partition 只能被组内一个 Consumer 消费
  • 不同 Consumer Group 之间互不影响,可同时消费同一 Topic

四、Partition 分配策略

1. RangeAssignor(默认策略)

按 Topic 维度分配:对每个 Topic,先对 Partition 排序,再对 Consumer 排序,按范围区段分配。

缺陷: 分区数不能被 Consumer 数整除时,排前面的 Consumer 分配更多分区,随订阅 Topic 增加,不均衡越来越严重。


2. RoundRobinAssignor

将所有 Topic 的 Partition 和所有 Consumer 排序后,轮询逐一分配。

  • 组内所有 Consumer 订阅相同 Topic → 分配均衡 ✅
  • 组内 Consumer 订阅不同 Topic → 分配可能倾斜 ❌

3. StickyAssignor(推荐生产使用)

从 Kafka 0.11 版本引入,通过 partition.assignment.strategy 参数配置。

两个目标(优先级从高到低):

  1. Partition 分配尽量均衡
  2. Rebalance 发生时,尽量与上次分配结果保持一致

Rebalance 后对比(C1 下线场景):

初始状态:
  C0: [T0P0, T1P0]
  C1: [T0P1, T1P1]
  C2: [T0P2, T1P2]

C1 下线后:

RoundRobinAssignor(完全重新分配):
  C0: [T0P0, T0P2, T1P1]
  C2: [T0P1, T1P0, T1P2]

StickyAssignor(最小调整):
  C0: [T0P0, T1P0, T0P1]  ← 仅接管 C1 的分区
  C2: [T0P2, T1P2, T1P1]  ← 仅接管 C1 的分区

结论: StickyAssignor 在 Rebalance 后迁移代价更小,生产环境推荐使用。


五、Rebalance 机制

触发条件(三种)

  1. Consumer Group 成员数量变化(加入、主动离组、故障下线)
  2. 订阅 Topic 数量变化
  3. 订阅 Topic 的分区数变化

通知机制

Consumer 端心跳线程定期向 Coordinator 发送心跳。Coordinator 决定开启 Rebalance 后,将 REBALANCE_IN_PROGRESS 封装进心跳响应,Consumer 收到后感知 Rebalance 开始。


五大协议

协议作用
HeartbeatConsumer 定期证明存活
LeaveGroup主动告知 Coordinator 离组
JoinGroup成员请求加入组
SyncGroupLeader Consumer 将分配方案告知所有成员
DescribeGroup展示组的全部信息(管理员使用)

Rebalance 主要用到前 4 种。


Consumer Group 五种状态

状态含义
Empty组内无成员,可能有未过期的已提交位移,只响应 JoinGroup
Dead组内无成员,元数据已被 Coordinator 移除,返回 UNKNOWN_MEMBER_ID
PreparingRebalance准备开始新 Rebalance,等待所有成员重新加入
CompletingRebalance所有成员已加入,等待分配方案(旧版本称 AwaitingSync)
StableRebalance 完成,Consumer 可正常消费

Rebalance 两阶段流程

阶段一:JoinGroup

所有 Consumer → 发送 JoinGroup 请求(携带订阅 Topic 信息)→ Coordinator
Coordinator → 选出 Leader Consumer(通常是第一个发请求的)
Coordinator → 将全量订阅信息发给 Leader

阶段二:SyncGroup

Leader Consumer → 制定分配方案 → 封装进 SyncGroup 请求 → Coordinator
其他 Consumer → 发送空内容的 SyncGroup 请求 → Coordinator
Coordinator → 将分配方案下发给所有组成员
各 Consumer → 知道自己消费哪些 Partition

四种 Rebalance 场景

场景触发方关键动作
新成员 C1 加入C1 发送 JoinGroup触发全组 Rebalance
C2 主动离组C2 发送 LeaveGroup触发全组 Rebalance
C2 超时被踢出心跳超时Coordinator 踢出,触发 Rebalance
C2 提交位移C2 提交 Offset不触发 Rebalance,仅更新位移

六、位移提交机制

概念区分:

  • Topic Partition 位移:消息在 Broker 端的存储偏移量
  • 消费者位移(Consumer Offset):Consumer Group 在某分区上的消费进度,记录下一条待消费消息的位移

位移提交按分区粒度进行,Consumer 为每个分配到的 Partition 单独提交 Offset。


自动提交

// 开启自动提交(默认 true)
props.put("enable.auto.commit", "true");
// 提交间隔,默认 5000ms
props.put("auto.commit.interval.ms", "5000");
  • 每次 poll() 调用时,自动提交上一批消息的位移
  • 缺陷: Rebalance 发生在自动提交间隔之间时,Offset 未提交 → Rebalance 完成后重复消费

手动提交

props.put("enable.auto.commit", "false");

同步提交:

// 阻塞等待直到提交成功,提交失败会抛异常
consumer.commitSync();
// 注意:必须在处理完 poll() 返回的所有消息后才调用,过早提交导致消息丢失

异步提交:

// 立即返回,不阻塞,不影响 TPS
// 失败不重试(重试时位移值可能已不是最新)
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // 记录日志或异常处理
    }
});

混合提交(生产推荐):

try {
    while (running) {
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
        process(records);
        consumer.commitAsync();  // 正常情况异步提交,不影响 TPS
    }
} catch (Exception e) {
    // 异常时同步提交,利用其自动重试能力
} finally {
    try {
        consumer.commitSync();   // 关闭前同步提交,确保最终一致
    } finally {
        consumer.close();
    }
}

七、__consumer_offsets 存储

版本演变:

版本Offset 存储位置原因
0.8 之前ZooKeeper旧方案
0.9 之后__consumer_offsets(Kafka 内置 Topic)ZooKeeper 不适合高频写,Kafka Topic 天然支持

消息格式(KV 结构):

Key:<Group ID, Topic 名称, 分区号>
Value:Offset 值 + 元数据(提交时间、过期时间等)

示例输出:
[order-group-1, topic-order, 0]::[OffsetMetadata[36672,NO_METADATA],
  CommitTime 1633694193000, ExpirationTime 1633866993000]

创建规则:

触发时机:第一个 Consumer 启动时自动创建
分区数:offsets.topic.num.partitions = 50(默认)
副本数:offsets.topic.replication.factor = 3(默认)
分区路由:abs(GroupId.hashCode()) % NumPartitions

常用查询命令:

# 查看所有消费者组
./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --list

# 查看某消费者组消费情况
./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 \
  --group test-group-1 --describe

# 计算 group 对应的 __consumer_offsets 分区
abs(GroupId.hashCode()) % 50

# 查看指定分区的位移数据(Kafka 0.11+)
./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 \
  --topic __consumer_offsets \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  --partition xx

八、消费进度监控(Consumer Lag)

Lag 定义: Producer 已生产消息数 - Consumer 已消费消息数

示例:
  Producer 生产:1000 万条
  Consumer 消费:900 万条
  Lag = 100 万条

Lag 的意义: Lag 值增大趋势 → 消费积压,可能拖慢下游处理速度。

四种监控方式:

  1. kafka-consumer-groups.sh 命令行工具(--describe
  2. Kafka Java Consumer API 编程获取
  3. Kafka 自带 JMX 监控指标
  4. 云产品自带监控功能

优缺点与局限性汇总

机制优点限制 / 踩坑点
Pull 模式Consumer 自控消费速度,支持延迟处理无消息时需 Long Polling,否则空轮询浪费 CPU
RangeAssignor实现简单,默认配置即用订阅 Topic 增加时不均衡问题加剧
RoundRobinAssignor相同 Topic 订阅时均衡不同 Topic 订阅时可能严重倾斜
StickyAssignorRebalance 后迁移最小,均衡性好实现最复杂,0.11 版本才引入
自动提交零代码,使用简单Rebalance 时可能重复消费
同步提交可靠,失败自动重试阻塞线程,影响 Consumer TPS
异步提交不阻塞,高 TPS失败不重试,可能丢失位移更新
混合提交兼顾 TPS 和可靠性代码逻辑相对复杂
__consumer_offsets高频写性能好,天然持久化默认 50 分区,过多 Group 时元数据管理复杂

行动清单

  1. Long Polling 验证:启动 Consumer 连接一个空 Topic,设置 poll(Duration.ofMillis(3000)),观察是否阻塞 3 秒后才返回
  2. 三种分配策略对比:创建含 3 个分区的 Topic,分别用三种策略启动 Consumer Group,关闭一个 Consumer,对比 Rebalance 后分配结果
  3. 复现 Rebalance 重复消费:开启自动提交(auto.commit.interval.ms=5000),在提交间隔内强制触发 Rebalance,观察消息重复消费现象
  4. 混合提交代码实现:实现生产级混合提交模板(commitAsync 正常流程 + commitSync 关闭保底),在 IDE 中验证异常场景
  5. **查看 **__consumer_offsets:对运行中的 Consumer Group 执行 kafka-consumer-groups.sh --describe,计算 GroupId 对应的分区号,再用 kafka-console-consumer.sh 直接读取原始位移数据
  6. Lag 监控搭建:用 kafka-consumer-groups.sh 脚本轮询 Lag 值,或接入 Prometheus + Grafana 实现可视化监控,设置 Lag 阈值告警

Kafka 基础概念及架构


一句话摘要

Kafka 是一个基于 Zookeeper 协调的分布式发布-订阅消息系统,核心解决高吞吐量场景下的消息持久化、分区有序传输与多消费者解耦问题。


二、核心知识点

2.1 Kafka 是什么

Kafka 是分布式、分区、多副本、多生产者、多订阅者的日志/消息系统,依赖 Zookeeper 进行集群协调。

设计目标:

  • 消息持久化时间复杂度为 O(1),对 TB 级数据保持常数访问性能
  • 单机支持每秒 100K 条消息传输
  • 支持分区内消息有序,分区间不保证全局顺序
  • 同时支持离线和实时数据处理
  • 支持在线水平扩展

消息传递模式: 仅支持发布-订阅模式,不支持点对点模式。消息只有拉取(Pull),无推送(Push),可通过轮询模拟推送。


2.2 四大核心 API

API作用
Producer API向一个或多个 Topic 发布消息流
Consumer API订阅一个或多个 Topic,处理消息流
Streams API充当流处理器,消费输入 Topic,生产输出 Topic(实现流转换)
Connector API构建可复用的生产者/消费者,将 Topic 连接到外部系统(如数据库变更捕获)

2.3 基本架构组件

消息(Message)与批次(Batch)

  • 消息是 Kafka 的最小数据单元,由字节数组组成,类比数据库的一行记录。
  • 消息包含键(Key,字节数组),用于决定写入哪个分区(对 Key 做散列取模)。
  • 消息可以批量写入,同一批次属于同一 Topic 和分区。批次越大,单位时间吞吐量越高,但单条延迟越长;批次数据会被压缩以提升传输和存储效率。
  • Kafka 使用 Apache Avro 作为序列化格式(强类型,优于 JSON/XML)。

主题(Topic)与分区(Partition)

  • Topic 是消息的逻辑分类,类比数据库的表或文件夹。
  • 一个 Topic 可划分为多个 Partition,实现横向扩展。
  • 消息以追加写入方式存储在分区中,以先进先出顺序读取。
  • Kafka 只保证单个 Partition 内消息顺序,无法保证整个 Topic 的全局顺序。
  • 分区提供数据冗余能力。

生产者(Producer)

生产者决定消息投递到哪个分区,有三种策略:

  1. 轮询(Round-Robin):默认策略,消息均匀分布到所有分区
  2. Key 散列:对消息 Key 计算散列值,映射到固定分区(相同 Key 必然进同一分区)
  3. 自定义分区器:按业务规则灵活路由

消费者(Consumer)与消费组(Consumer Group)

  • 消费者通过偏移量(Offset) 标记已读消息位置。Offset 是单调递增的整数,在给定分区内每条消息唯一。
  • Offset 存储在 Kafka 自身(早期存在 Zookeeper,现已迁移到 Kafka)。消费者重启不丢失读取状态。
  • 消费组保证每个分区只能被组内一个消费者使用,避免重复消费。
  • 某消费者失效时,触发再平衡(Rebalance),重新分配分区给组内其他消费者。

Broker 与集群

  • 单个 Broker 是一台独立的 Kafka 服务器,可轻松处理数千个分区每秒百万级消息
  • 集群中自动选举出一个 Broker 作为控制器(Controller)(通过 Zookeeper Master 选举),负责:分区分配给 Broker、监控 Broker 健康。
  • 每个分区归属于一个 Broker,该 Broker 称为该分区的首领(Leader)
  • 分区可复制到多个 Broker(副本),副本分区不处理读写请求,仅负责数据同步。

Topic 分区与 Broker 数量的关系:

场景分配规则
Partition 数 == Broker 数 (N)每个 Broker 存储 1 个 Partition
Partition 数 (N) < Broker 数 (N+M)N 个 Broker 各存 1 个,M 个 Broker 不存该 Topic
Partition 数 (N) > Broker 数部分 Broker 存多个 Partition(生产环境应避免,易造成负载不均)

2.4 副本机制(Replicas)

副本类型

  • 首领副本(Leader Replica):每个分区只有一个,所有生产者写入和消费者读取都必须经过它。
  • 跟随者副本(Follower Replica):不处理客户端请求,只从 Leader 拉取消息保持同步。Leader 崩溃时,某个 Follower 晋升为新 Leader。

副本集合分类

AR(Assigned Replicas)= ISR + OSR
  • ISR(In-Sync Replicas):与 Leader 保持一定同步程度的副本集合(含 Leader 本身)。只有 ISR 中的副本可以在 Leader 切换时被选举为新 Leader。
  • OSR(Out-of-Sync Replicas):与 Leader 同步滞后过多的副本集合(不含 Leader)。正常情况下 OSR 为空,即 AR = ISR。

关键水位线概念

  • HW(High Watermark,高水位):消费者只能消费到 HW 之前的消息,表示已完成多副本同步的最高 Offset。
  • LEO(Log End Offset):当前日志文件中下一条待写入消息的 Offset,表示分区日志的末尾位置。

2.5 偏移量(Offset)

  • 生产者 Offset:消息写入分区时,该分区当前最大的 Offset 即为生产者 Offset。
  • 消费者 Offset:消费者在各分区上最后消费到的位置。例如:生产者已写入到 Offset=12,Consumer A 消费到 Offset=9,Consumer B 可从不同位置独立消费,互不干扰。

2.6 性能优化手段

Kafka 高吞吐量的底层实现依赖:

  • 零拷贝(Zero-Copy):减少内核态与用户态之间的数据复制
  • 顺序读写(Sequential I/O):磁盘顺序访问速度接近内存随机访问
  • Linux 页缓存(Page Cache):利用操作系统缓存加速读写

三、优缺点与局限性

优势

  • 高吞吐:单机每秒处理几十到百万级消息,存储 TB 级数据性能稳定
  • 高可用:副本机制 + Partition Leader 自动切换,支持节点故障无感知
  • 持久化:消息持久化到磁盘,配合 replication 防止数据丢失
  • 易扩展:增加 Broker 无需停机,Producer/Consumer/Broker 均支持水平扩展

局限性与踩坑点

问题说明
无全局消息顺序只保证单 Partition 内有序,跨 Partition 无序。需要全局顺序时,必须将相关消息路由到同一 Partition(用同一个 Key)
仅支持发布-订阅不支持点对点(Queue)模式,若需求是一条消息只消费一次且有多个不同消费者竞争,需通过消费组设计实现
Partition 数 > Broker 数时有风险单 Broker 承载多 Partition,负载不均,生产环境应避免
消费者 Offset 管理早期 Offset 存 Zookeeper,高并发下 Zookeeper 成为瓶颈;现已改为存 Kafka 内部 Topic(__consumer_offsets
OSR 副本存在时可靠性下降若 ISR 集合缩小(大量副本进入 OSR),Leader 切换时选择范围受限,极端情况下可能数据丢失
HW 机制导致消费延迟消费者只能读到 HW 以前的数据,HW 推进依赖 Follower 同步进度,Follower 滞后会间接影响消费实时性

四、行动清单

  1. 搭建本地环境:使用 Docker Compose 启动单节点 Kafka + Zookeeper,练习创建 Topic、生产消息、消费消息的基本命令(kafka-topics.shkafka-console-producer.shkafka-console-consumer.sh)。
  2. 验证 Partition 顺序性:创建一个有 3 个 Partition 的 Topic,不指定 Key 发送消息,观察消息被分发到不同 Partition,验证跨 Partition 无序的特性;再用固定 Key 发送,验证同 Key 消息的有序性。
  3. 理解 Consumer Group 机制:启动 2 个消费者属于同一 Consumer Group、3 个属于不同 Group,观察消息分发差异,深入理解”同组内每个 Partition 只被一个消费者消费”的规则。
  4. 深入学习副本机制:重点研究 ISR/OSR 的动态变化条件(replica.lag.time.max.ms 参数控制滞后阈值),以及 min.insync.replicas 参数如何影响数据可靠性与可用性的权衡。
  5. 学习 Offset 管理:研究 auto.commit.enableenable.auto.commit 配置,理解自动提交 Offset 与手动提交的区别,以及消费失败时如何避免消息丢失或重复消费。
  6. 扩展阅读:进一步研究 Kafka 的 Streams APIConnector API,以及 Kafka 与 Spring Boot 的整合实践(参考原文作者的配套文章《Kafka原理解析及与Spring Boot整合步骤》)。

Kafka 存储架构深度笔记

一句话摘要

Kafka 存储架构的本质是顺序追加写日志 + 稀疏索引,通过「Topic + Partition + Replica + LogSegment + 索引」五层结构解决海量数据的高性能写入与高效检索问题。


核心知识点

一、存储方案选型

存储介质 IO 速度对比

层级(从快到慢):
CPU 寄存器 > CPU Cache > 内存 > SSD > 机械磁盘 > 网络

关键结论(反直觉):
  机械磁盘顺序 I/O:53.2M values/s
  内存随机 I/O:   36.7M values/s
  → 磁盘顺序写性能 > 内存随机读写性能

两种存储方向的权衡

方向实现方式代表系统缺陷
提高读速度维护索引MySQL(B+ Tree)大量写操作需维护索引,降低写效率
提高写速度顺序追加日志Kafka、大数据系统无索引,查询需遍历

Kafka 为何不用 B+ Tree

  • 每次写都要维护索引,成本高
  • 存在”数据页分裂”操作,对高并发系统太重
  • 写并发要求百万级 TPS,B+ Tree 无法满足

Kafka 为何不用哈希索引

  • 哈希索引需常驻内存
  • 每秒百万级消息写入,内存极易 OOM

Kafka 最终方案:稀疏索引

将 Offset 设计为有序字段,消息在 log 文件中有序存放,将消息划分成若干块,只索引每块第一条消息的 Offset:

查找流程(类似二分查找):
1. 根据目标 Offset 大小,在稀疏索引中定位到对应的块
2. 在块内顺序查找目标消息

优点:
- 无需全量内存存储索引
- 查询效率远优于全文遍历
- 写入只需顺序追加,不维护复杂索引

二、存储架构设计

五层结构:Topic → Partition → Replica → LogSegment → 索引文件

Topic(逻辑概念,消息分类)
  └─ Partition(物理存储单元,水平扩展)
       └─ Replica(副本,保证高可用)
            └─ LogSegment(日志分段,防止单文件过大)
                 ├─ .log(数据文件)
                 ├─ .index(偏移量索引)
                 ├─ .timeindex(时间戳索引)
                 └─ .snapshot(快照索引,可能存在)

日志目录布局示例:

Topic: topic-order,4个分区,对应磁盘目录:
  topic-order-0/
  topic-order-1/
  topic-order-2/
  topic-order-3/

每个目录内的文件(以 baseOffset 命名,固定20位数字,不足补0):
  00000000000000000000.log
  00000000000000000000.index
  00000000000000000000.timeindex
  00000000012768089.log         ← baseOffset=12768089,说明前面有12768089条消息
  00000000012768089.index
  00000000012768089.timeindex

activeSegment: 只有最后一个 LogSegment 可执行写入操作,称为 activeSegment。达到阈值后滚动创建新 activeSegment。


三、日志格式演变

V0 版本(Kafka 0.10.0 之前)

消息结构:
  [Offset(8B)] [MessageSize(4B)] [CRC32(4B)] [magic(1B)] [attributes(1B)]
  [key length(4B)] [key] [value length(4B)] [value]

字段说明:
  magic = 0
  attributes 低3位:0=NONE, 1=GZIP, 2=SNAPPY, 3=LZ4
  key length = -1 表示无 key
  value length = -1 表示消息为空

最小消息大小:14 字节

计算示例(key="hello", value="world"):
  4(CRC) + 1(magic) + 1(attributes) + 4(key length) + 5(key) + 4(value length) + 5(value)
  = 24 字节

V1 版本(Kafka 0.10.0 ~ 0.11.0)

相比 V0 新增:timestamp 字段(8B)

作用:
  对内:影响日志保存、切分策略
  对外:影响消息审计、端到端延迟计算等

最小消息大小:22 字节

同样示例消息大小:24 + 8 = 32 字节

V0/V1 的设计缺陷

  1. 空间使用率低:key/value 长度字段固定 4 字节,无论实际大小
  2. 消息总长度未保存:需实时计算,效率低
  3. 只保存最新消息位移
  4. 冗余 CRC 校验:批次发送时每条消息仍单独保存 CRC

V2 版本(Kafka 0.11.0.0+)

核心改进(引入 RecordBatch 批次结构):
1. CRC 从消息中移除,统一放到 RecordBatch 中
2. 增加 producer id、producer epoch、序列号(支持幂等性和事务消息)
3. 使用增量形式保存时间戳和位移(节省空间)
4. 使用可变长度类型(解决空间使用率低问题)
5. 批次最小 61 字节(比单条消息大,但批量发送时整体节省磁盘空间)

四、日志清理机制

通过 Broker 端参数配置清理策略:

log.cleanup.policy = delete    # 默认,日志删除
log.cleanup.policy = compact   # 日志压缩(需同时设置 log.cleaner.enable=true)
log.cleanup.policy = delete,compact  # 同时支持两种

检查周期:log.retention.check.interval.ms = 300000(默认5分钟)

日志删除(Log Retention)三种策略

策略一:基于时间

配置参数优先级:
  log.retention.ms > log.retention.minutes > log.retention.hours
  默认:log.retention.hours = 168(7天)

判断依据:
  非文件修改时间,而是日志段中最大时间戳 largestTimeStamp
  时间戳 > 0 则用时间戳,否则用文件 lastModifiedTime

删除步骤:
  1. 从日志段跳跃表中移除待删除段(确保无线程读取)
  2. 对应所有文件(含索引)添加 ".deleted" 后缀
  3. 延迟任务("delete-file",默认1分钟执行)删除 ".deleted" 文件
     可通过 file.delete.delay.ms 配置

策略二:基于日志大小

配置参数:
  log.retention.bytes = -1(默认,无穷大)  ← 所有日志文件总大小,非单个
  log.segment.bytes = 1GB(单个 LogSegment 大小)

删除步骤:
  1. 计算日志总大小 - retentionSize = 需删除的总大小
  2. 从第一个日志段开始查找 deletableSegments
  3. 执行删除

策略三:基于日志起始偏移量

判断依据:下一个日志段的 baseOffset <= logStartOffset 则可删除

删除示例:
  logStartOffset = 40
  日志段1:[0, 20)  → 下一段 baseOffset=20 < 40 → 加入 deletableSegments
  日志段2:[20, 35) → 下一段 baseOffset=35 < 40 → 加入 deletableSegments
  日志段3:[35, 50) → 下一段 baseOffset=50 > 40 → 停止,从此段开始不删除

日志压缩(Log Compaction)

规则:对相同 key 的不同 value,只保留最新版本
配置:log.cleanup.policy = compact + log.cleaner.enable = true

类比:类似 Redis RDB 持久化模式
场景:系统崩溃后快速恢复,只需恢复最新状态数据

五、磁盘数据存储机制

三大核心技术组合:

1. 顺序追加写日志
   → 写性能接近内存,避免磁盘寻址

2. PageCache(OS 页缓存)
   → 读:先查 PageCache,命中直接返回,未命中才读磁盘
   → 写:写入 PageCache(脏页),OS 在合适时机刷盘
   → Kafka 大量使用 PageCache,是高吞吐的重要因素之一

3. 零拷贝(Zero-Copy)
   → 数据从 PageCache 直接发到网卡,跳过应用层拷贝
   → 详见三高架构笔记

消息写入磁盘完整路径:

Producer 发送消息
  → 经过网络层(Acceptor → Processor → RequestHandler)
  → 写入 PageCache(os cache)
  → OS 异步刷盘到 LogSegment 的 .log 文件
  → 同时更新 .index 和 .timeindex 索引文件

优缺点与局限性汇总

技术适用场景限制踩坑点
稀疏索引顺序 Offset 查询非精确索引,块内仍需顺序扫描index.interval.bytes 设置过大导致块内扫描过多
V2 日志格式批量发送场景单条消息批次头开销 61 字节,比 V0/V1 大小消息场景单条发送反而更浪费空间
基于时间清理日常日志保留控制依赖 largestTimeStamp,时钟漂移可能误判不要依赖文件修改时间判断,需关注时间戳索引
基于大小清理磁盘容量管控log.retention.bytes 是全分区总大小,非单 Segment误以为是单 Segment 大小而设置过小导致频繁删除
Log Compaction状态存储、快速恢复只保留最新值,历史数据不可追溯与 delete 策略混用时需明确哪些 Topic 用哪种策略
PageCache读多写多场景依赖 OS 管理,宕机时未刷盘数据丢失生产环境需配合 acks=all + min.insync.replicas 保障数据安全

行动清单

  1. 观察实际日志目录:在 Kafka 安装目录下找到 log.dirs 配置的路径,查看 topic-partition 目录下的 .log.index.timeindex 文件,验证 baseOffset 命名规则
  2. 稀疏索引验证:用 kafka-dump-log.sh --files xxx.index 查看索引内容,观察索引条目的间隔(index.interval.bytes 默认 4096 字节)
  3. 日志格式对比:用 kafka-dump-log.sh --files xxx.log --print-data-log 查看 V2 格式 RecordBatch 结构,对照三个版本字段差异
  4. 三种清理策略实验:分别创建配置 delete/compact/delete,compact 的 Topic,写入数据后触发清理,观察文件变化
  5. PageCache 监控:在 Kafka Broker 机器上执行 cat /proc/meminfo | grep -i cache,对比生产者高负载时 PageCache 占用变化
  6. 稀疏索引源码:阅读 OffsetIndex.scala 中的 lookup() 方法,理解二分查找定位块的具体实现

Kafka 高可靠高性能原理笔记


一句话摘要

Kafka 通过 ACK 策略 + 副本机制 + Leader Epoch 保障消息不丢失,通过 PageCache 异步刷盘 + 零拷贝 + 批量压缩 + 稀疏索引 + 多 Reactor 模型实现高吞吐低延迟。


核心知识点

1. 整体架构

四大组件:

  • Producer:消息创建者,通过分区器路由到指定 Broker
  • Broker:服务实例,负责消息持久化与中转
  • Consumer:主动 Pull 消息,同一消费者组内每条消息只被一个消费者消费
  • ZooKeeper:管理 Broker/Consumer 集群元数据;Producer 不在 ZK 存数据,只监听 Broker 和 Topic 信息

核心数据概念:

概念说明
Topic消息分类单位,收发时只需指定 Topic
PartitionTopic 下的分区,分布在不同 Broker,提供并行处理和横向扩容能力
SegmentPartition 下的分段文件,含 .log(数据)、.index(位移索引)、.timeindex(时间戳索引),以该段最小 Offset 命名
Offset消息在分区内的唯一标识,单调递增,Kafka 只保证分区内有序,不保证 Topic 全局有序

2. 高可靠性:Producer → Broker 消息不丢失

2.1 ACK 策略

Request.required.acks = 0   # 发即认为成功,不等确认,适合日志等低可靠场景
Request.required.acks = 1   # Leader 写入成功即返回,有丢数据风险
Request.required.acks = -1  # ISR 中所有副本写完才返回,强可靠

强可靠组合配置(三件套):

Request.required.acks = -1
min.insync.replicas > 2
unclean.leader.election.enable = false   # 只允许 ISR 中的副本成为新 Leader

2.2 同步发送 vs 异步发送

  • 异步发送:消息写入 Input Channel 立即返回,Dispatcher 协程负责真正发送;发送失败只能从错误日志得知,无法自建兜底函数处理
  • 同步发送:本质也是异步的,通过 WaitGroup 阻塞等待 Broker 确认,可明确感知成功/失败;失败时可重试,保障 at-least-once 语义

2.3 幂等性与事务(exactly-once)

类型开启方式核心能力局限
幂等性 Producerenable.idempotence=trueBroker 用 PID + Seq number 去重仅限单分区、单会话
事务型 Producerenable.idempotence=true + Transcational.id + Consumer 侧 Isolation.level=read_committed跨分区原子写入,跨会话事务恢复对性能影响大;相同 Transaction ID 的旧 Producer 实例会失效

事务协调组件:TransactionCoordinator,事务状态持久化在内部 Topic __transaction_state


3. 高可靠性:Broker 数据持久化

3.1 异步刷盘机制

Broker 收到消息 → 写入 PageCache → 立即返回成功 → Linux Flusher 异步刷盘(触发条件:主动调用 sync/fsync、可用内存低于阈值、Dirty Data 时间达到阈值)。

风险:单机未刷盘时 Broker 宕机,数据丢失。解法:副本机制。

3.2 副本机制(Replica)

  • AR = ISR + OSR
  • ISR:与 Leader 同步延迟不超过 replica.lag.time.max.ms(默认 10s)的副本集合
  • Follower 每隔 500ms 向 Leader Fetch 一次数据
  • Leader 宕机时,从 ISR 中选举新 Leader(由 ZooKeeper 触发)

3.3 HW 和 LEO 机制

  • LEO(Log End Offset):当前 Log 文件下一条待写入消息的 Offset
  • HW(High Watermark):ISR 所有副本都已写入的最大 Offset,消费者只能消费 HW 之前的数据

计算规则:

Leader HW  = min(所有副本 LEO)
Follower HW = min(Follower 自身 LEO, Leader HW)

一次完整更新流程(1 个 Follower 场景):

  1. 初始:HW_L=0, LEO_L=0, HW_F=0, LEO_F=0
  2. Leader 收到消息 → LEO_L=1;Follower Fetch → Leader 记录 LEO_F=0HW_L=min=0,返回 HW_L=0, LEO_L=1;Follower 存消息 → LEO_F=1, HW_F=min(1,0)=0
  3. Follower 再次 Fetch → Leader 记录 LEO_F=1HW_L=min=1;Follower 更新 HW_F=min(1,1)=1

隐患:Follower 和 Leader 的 HW 更新存在时间差,这是数据丢失和数据错乱问题的根源。

3.4 KIP-101 问题及 Leader Epoch 解法

数据丢失场景:Follower A 重启后以 HW 为基准截断日志,若随后 Leader B 宕机,A 成为新 Leader,被截断的已提交消息就丢失了。

数据错乱场景:A 和 B 同时宕机,B 的消息因未刷盘丢失,B 却被选为新 Leader 并写入新消息 m3;A 恢复后不知道 Offset=1 的消息已不一致,直接开始同步,导致日志错乱。

Leader Epoch 解法(类似 Raft 任期号):

  • 每次选举新 Leader,Epoch(严格单调递增 ID)加 1
  • Follower 重启后不再以 HW 为准截断,而是先发 LeaderEpochRequest 询问当前 Epoch 的起始 Offset
  • 数据丢失解决:A 重启后得知 LEO=2,保留 m2,不再截断
  • 数据错乱解决:A 重启后得知 B 已是第 1 代,起始 Offset=1,A 截断自己 Offset=1 的消息,重新与 B 同步

4. 高可靠性:消费者侧

提交方式参数特点风险
自动提交enable.auto.commit=true + auto.commit.interval.ms后台自动提交,无需手动处理消费中途崩溃可能导致消息丢失
手动提交enable.auto.commit=false业务逻辑处理完再提交未提交时重启会重复消费,需消费侧做幂等

最佳实践:手动提交 + 幂等,实现 at-least-once + 去重 = 业务层 exactly-once。


5. 高性能:异步发送 + 批量发送

  • 异步发送本质:消息入 Channel 即返回,最大化吞吐
  • 批量发送两个关键参数:
Batch.size = 16KB(默认)    # 增大可提升吞吐,但增加延迟
linger.ms = 0(默认)        # 设为 10~100ms 可积攒更多消息一起发

触发发送:Batch.size 达到上限 linger.ms 到期,两者满足其一即发送。


6. 高性能:压缩技术

Compression.type = none/gzip/snappy/lz4/zstd  # 2.1.0 版本后支持 zstd
Compression.level = -1(默认)                  # 0-9,-1 表示使用默认级别

性能对比(均越高越好):

  • 吞吐量:LZ4 > Snappy > zstd ≈ GZIP
  • 压缩比:zstd > LZ4 > GZIP > Snappy

注意:Broker 与 Producer 压缩算法不同时,Broker 会先解压再重新压缩,有额外 CPU 开销;消息版本不兼容也会触发解压重压。


7. 高性能:PageCache + 顺序追加写

  • 消息写入 PageCache 即返回,避免同步刷盘的巨大开销
  • 所有消息顺序追加到 Partition 日志文件尾部,充分利用顺序 I/O,规避随机 I/O

8. 高性能:零拷贝

网络数据持久化(Producer → Broker):mmap

传统方式:4 次数据拷贝 + 4 次上下文切换 + 2 次系统调用

mmap 优化:Socket Buffer 的数据在内核空间直接落盘,无需拷贝到应用进程缓冲区,减少上下文切换和拷贝次数。

数据传输(Broker → Consumer):Sendfile

传统方式:4 次数据拷贝 + 4 次上下文切换

Sendfile 优化(NIO 的 transferTo/transferFrom):2 次内核数据拷贝 + 2 次上下文切换 + 1 次系统调用,消除 CPU 数据拷贝。


9. 高性能:稀疏索引

设计思想:不为每条消息建索引,每写入 log.index.interval.bytes(默认 4KB)才新增一个索引项,用空间换时间。

位移索引(.index)索引项结构:

  • 相对位移 = 消息 Offset - 文件起始 Offset(例:文件名 00000000000000000100.index,存 Offset 150 时,相对位移 = 50,用 4 字节存储,节省空间)
  • 文件物理位置:消息在 .log 文件中的字节偏移

查找流程示例(查找 Offset=3550 的消息):

  1. 二分查找索引文件,找到小于 3550 的最大索引项 [3528, 2310272]
  2. 从 .log 文件物理位置 2310272 顺序扫描,直到找到 Offset=3550 的消息

时间复杂度:O(log N)(二分)+ O(M)(顺序扫描,M 为索引间距内的消息数)。

时间戳索引(.timeindex):Kafka 0.10.0.0 引入,结构与位移索引类似,支持按时间戳快速查找消息。


10. 高性能:多 Reactor 多线程网络模型

Acceptor → 轮询分发 → Processor 线程 → RequestQueue → KafkaRequestHandlerPool(Worker 线程池)

                                               Response 返回给 Processor → 客户端
  • SocketServer:实现 Reactor 模式,处理多客户端并发请求
  • KafkaRequestHandlerPool:Worker 线程池,处理实际 I/O 逻辑
  • 充分利用多核 CPU,提升吞吐和响应速度

11. 负载均衡

生产者侧(DefaultPartitioner):

  • key 不为 null → 对 Key 做 Hash → 路由到固定分区(顺序消息实现的关键)
  • key 为 null → 轮询所有可用分区

消费者侧(通过 Partition.assignment.strategy 配置):

  • RangeAssignor:连续分区分配给同一消费者
  • RoundRobinAssignor:轮询分配
  • StickyAssignor(0.11.0.0+):在均衡前提下尽量保留原有分配结果,减少 Rebalance 开销

优缺点与局限性

技术点适用场景局限 / 踩坑点
acks=-1金融、支付等强可靠场景写入延迟最高,需配合 min.insync.replicas>2 才有意义
异步发送高吞吐、对延迟不敏感场景发送失败只能靠日志感知,无法自建回调兜底;不适合强一致业务
事务型 Producer跨分区原子写、consumer-transform-producer性能开销大,慎重使用;旧 Producer 实例在新实例启动后自动失效
幂等性 Producer单分区单会话去重不跨分区、不跨会话(重启即失效),不能替代事务
批量发送高吞吐场景Batch.size 过大会增加延迟,需结合 linger.ms 调优
压缩技术网络带宽紧张场景消耗 CPU;Broker 与 Producer 压缩算法不一致时有额外解压重压开销
PageCache 异步刷盘高吞吐写场景单机宕机时 PageCache 中未刷盘数据丢失,必须依赖副本机制弥补
零拷贝大流量数据传输依赖 OS 支持(Linux sendfile / mmap),不同 OS 实现效果有差异
稀疏索引海量消息检索log.index.interval.bytes 过大会增加顺序扫描范围,是重要调优参数
HW 机制副本一致性保障HW 更新存在时间差,需配合 Leader Epoch 才能彻底解决数据丢失/错乱
自动提交 Offset日志分析等低可靠场景消费中途崩溃会丢消息,不适合强可靠业务
StickyAssignor减少 Rebalance 开销0.11.0.0+ 才支持,老版本无此选项

行动清单

  1. 搭建实验环境:本地起一个 3-Broker 的 Kafka 集群,分别测试 acks=0/1/-1 下的吞吐和延迟差异,感受三种策略的权衡。
  2. 验证 Leader Epoch:模拟 Leader 宕机场景,观察 Follower 的日志截断行为,对比有无 Leader Epoch 时的表现(可通过旧版本 Kafka 对比)。
  3. 调优批量发送参数:在压测环境中调整 Batch.size(16KB / 64KB / 128KB)和 linger.ms(0 / 10 / 100ms)的组合,找到延迟与吞吐的最优平衡点。
  4. 压缩算法选型:对业务消息分别测试 lz4(高吞吐优先)和 zstd(高压缩比优先)在实际数据上的压缩效果,确认是否存在 Broker 端的解压重压开销。
  5. 实现手动提交 + 幂等消费:在项目中将 enable.auto.commit=false,在消息处理成功后手动提交,并在业务层(数据库唯一键或 Redis SETNX)实现幂等,验证重启后的重复消费场景。
  6. 阅读 KIP-101:直接阅读官方 Proposal(文章参考文献第 6 条),理解 Leader Epoch 的设计决策过程。
  7. 深入零拷贝:分别查阅 Linux mmapsendfile 系统调用文档,理解内核态/用户态切换的具体过程,再结合 Java NIO 的 transferTo 实现对照理解。
  8. 稀疏索引调优:调整 log.index.interval.bytes 参数,观察索引文件大小变化以及查询延迟变化,确定适合业务消息体积的最优间距。

Kafka 三高架构设计

一句话摘要

Kafka 三高架构的核心是:高可用靠 ISR + ACK高性能靠顺序写 + 零拷贝 + 内存池高并发靠三层 Reactor 网络模型,三者协同构建工业级消息队列。

📌 本篇为系列精简复习版,各模块已有独立深度笔记,此处重点梳理核心结论与参数速查


核心知识点

一、高可用:Leader 选举 + 副本 + ISR + ACK

Controller 选举(三步)

1. 第一个启动的 Broker → ZooKeeper 创建临时节点 /controller → 成为 Controller
2. 其他 Broker 尝试创建 /controller → 失败 → 放弃竞争
3. 其他 Broker 在 Controller 上注册监听器,监听节点状态变化

ISR 核心参数

replica.lag.time.max.ms = 10000(默认10秒)
含义:Follower 落后 Leader 超过10秒 → 移出 ISR

ISR 存储路径:
/brokers/topics/[topic]/partitions/[partition]/state

ISR 维护者:
- Controller:Leader 选举时更新
- Leader:定期线程检测 Follower 是否脱离 ISR

ACK 三种模式速查

acks确认时机数据安全性能适用场景
0写入 Socket Buffer 即返回最低,可能丢失最高日志采集
1Leader 写入本地日志后返回(默认)通用场景
all所有 ISR 副本确认后返回最高最低金融、订单
acks=all 配套参数:
min.insync.replicas = 2(默认1)
能容忍宕机副本数 = min.insync.replicas - 1
ISR 副本数 < min.insync.replicas → 抛出 NotEnoughReplicas,停止写入

二、高性能:Reactor + 顺序写 + 零拷贝 + 压缩 + 内存池

Reactor 多路复用

Java NIO 三核心:Channel(传输)、Buffer(缓冲)、Selector(单线程多路复用)

Kafka Reactor 三角色:
  Acceptor(1个)→ 接收连接
  Processor(N个)→ 读写数据,每个独立 Selector
  Handler(M个)→ 处理业务逻辑

各角色之间用队列缓冲,充分解耦

生产消息流程(7步)

消息 → ProducerRecord 封装
     → Serializer 序列化
     → Partitioner 确定分区(拉取集群元数据)
     → 写入 RecordAccumulator 缓存队列(批次 RecordBatch)
     → Sender 线程封装多批次为 Request
     → Selector 对应 KafkaChannel 发往 Broker
     → 内存池回收内存,避免 Full GC

关键参数:
  batch.size = 16KB(写满立即发)
  linger.ms = 500ms(未满超时发)

顺序写 + OS Cache

写入路径:消息 → OS PageCache(内存)→ 操作系统异步刷盘

关键结论:
  机械磁盘顺序写 ≈ 内存写入速度
  顺序追加到文件末尾,不做随机写,避免磁盘寻址开销

零拷贝

传统路径(4次拷贝):
  磁盘 → OS Cache → 应用缓存 → Socket 缓存 → 网卡

零拷贝路径(2次拷贝):
  磁盘 → OS Cache → 网卡(Socket 缓存只拷贝描述符)

Java 实现:
  fileChannel.transferTo()  →  底层调用 Linux sendfile() 系统调用
  MappedByteBuffer         →  mmap 内存映射

压缩传输

流程:Producer 端压缩 → Broker 端保持 → Consumer 端解压缩
中间传输全程不解压,节省网络和磁盘开销

支持算法:lz4 / snappy / gzip / ZStandard(Kafka 2.1.0+,压缩比最高)
Producer、Broker、Consumer 必须使用相同压缩算法

内存池设计

Buffer Pool 总大小:32MB
├── 已分配内存队列:固定 16KB 内存块,反复复用
└── 可用内存:未分配区域

数据结构 Batches:
  Key → 分区
  Value → 该分区的批次队列

申请 → 写数据 → Sender 发送 → 清空归还内存池 → 复用
避免频繁 GC,保证生产者稳定性

三、高并发:三层网络架构

第一层:Acceptor(1个)
  绑定 OP_ACCEPT,监听客户端连接
  封装为 SocketChannel,轮询分发给 Processor

第二层:Processor(默认3个,num.network.threads=3)
  连接队列存放 SocketChannel(轮询存放)
  注册 OP_READ → 接收请求 → 解析二进制 → 封装 Request → 放入 RequestQueue
  监听 ResponseQueue → 注册 OP_WRITE → 返回响应给客户端

中间层:RequestQueue(共享)+ ResponseQueue(每个 Processor 独享,共3个)

第三层:RequestHandler 线程池(默认8个,num.io.threads=8)
  从 RequestQueue 取请求 → 解析 → 写磁盘
  封装 Response → 放入对应 Processor 的 ResponseQueue

容量计算:

默认:每个 RequestHandler 处理 2000 QPS
  默认总量:2000 × 8 = 1.6万 QPS

扩容配置:
  num.network.threads = 9
  num.io.threads = 32(建议与 CPU 核数一致)
  扩容后:2000 × 32 = 6.4万 QPS

优缺点与局限性速查

机制核心价值限制 / 踩坑
ISR动态维护同步副本,保障数据安全replica.lag.time.max.ms 太小导致频繁移出 ISR
acks=all最强数据保障需配合 min.insync.replicas≥2,否则单副本等于没保障
acks=0最高吞吐Retries 失效,Offset 返回 -1,数据丢失无感知
零拷贝大幅减少 CPU 和上下文切换数据需修改时零拷贝失效
内存池(16KB固定)避免 Full GC消息远大于 16KB 时内存利用率下降
批次发送降低请求数,提升吞吐linger.ms=0 时批次失效,等于单条发送
顺序写 + PageCache写性能接近内存宕机时 PageCache 未刷盘数据丢失,需 acks 配合保障
Reactor 三层架构高并发请求处理默认参数适合中等负载,高负载需手动调整两个核心参数

行动清单

  1. 串联三高机制:画一张从 Producer 发消息到 Consumer 消费全链路图,标注每个环节对应哪个三高机制
  2. ACK 场景模拟:分别用 acks=0/1/all,kill Leader 节点,对比三种配置下数据丢失行为
  3. 零拷贝性能测试:用 kafka-consumer-perf-test.sh 压测,对比开启/关闭零拷贝的消费吞吐量差异
  4. 网络参数调优实验:将 num.network.threads=9num.io.threads=32 后重新压测,验证 QPS 提升是否符合 2000×线程数的估算
  5. 压缩算法选型:分别测试 lz4/zstd 的 CPU 占用、磁盘占用、吞吐量,制定业务选型决策标准
  6. 内存池源码:阅读 BufferPool.javaallocate()deallocate(),验证 16KB 固定块复用逻辑

Kafka 消息丢失全解析


一句话摘要

Kafka 消息丢失发生在生产者、Broker、消费者三个环节,通过对应的配置加固 + 幂等/事务机制,可实现从 At-Most-Once 到 Exactly-Once 的语义升级。


二、核心知识点

2.1 Kafka 基础概念速查

组件职责
BrokerKafka 服务器,存储消息
Producer发送消息到 Kafka
Consumer从 Kafka 拉取并处理消息
Topic消息的逻辑分类
PartitionTopic 的物理分片,分布在不同 Broker
ISRIn-Sync Replicas,与 Leader 保持同步的副本集合
HWHigh Watermark,ISR 全部复制到的最高偏移量,消费者只能看到 HW 之前的消息
LEOLog End Offset,副本本地日志末端位置;Leader LEO > HW 表示有未同步完的新消息

2.2 消息丢失的三大环节与根因

生产者端

场景原因
acks=0 (Fire-and-Forget)不等待 Broker 确认,网络抖动直接丢失
异步发送未处理回调发送失败无感知,程序继续执行
重试次数不足网络瞬断导致消息发送失败后不再重试

Broker 端

场景原因
replication.factor=1只有单副本,Broker 宕机数据彻底丢失
min.insync.replicas=1Leader 确认即返回,Follower 未同步时 Leader 宕机则丢失
unclean.leader.election.enable=true允许 LEO 落后的 Follower 选为新 Leader,造成数据截断

消费者端

场景原因
enable.auto.commit=true偏移量已提交但消息处理失败,重启后跳过该消息
先提交偏移量再处理消息处理过程异常,偏移量已记录为已消费

2.3 生产者端防丢失方案

关键配置:

props.put(ProducerConfig.ACKS_CONFIG, "all");           // 所有 ISR 副本确认才算成功
props.put(ProducerConfig.RETRIES_CONFIG, 10);           // 重试次数
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔 300ms
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等,避免重复

异步发送 + 回调处理(推荐):

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 发送失败,写本地存储等待重试
        saveToLocalStorage(record);
    }
});

同步发送(兜底方案):

RecordMetadata metadata = producer.send(record).get(); // 阻塞等待确认

2.4 Broker 端防丢失方案

关键配置(server.properties):

default.replication.factor=3       # 默认副本数
min.insync.replicas=2              # 至少 2 副本同步才返回成功
unclean.leader.election.enable=false  # 禁止落后副本成为 Leader
log.flush.interval.messages=1000   # 每 1000 条消息刷盘一次
log.flush.interval.ms=1000         # 每 1000ms 刷盘一次

创建高可靠 Topic 命令:

bin/kafka-topics.sh --create --bootstrap-server broker1:9092 \
  --topic important-topic \
  --partitions 6 --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config flush.messages=1 \
  --config retention.ms=1209600000

2.5 消费者端防丢失方案

关键配置:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 关闭自动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

正确模式:先处理,再提交(批量):

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processRecords(records);   // 先处理
consumer.commitSync();     // 成功后再提交

精细模式:按分区提交偏移量:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    // ... 处理每条消息 ...
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    currentOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
consumer.commitSync(currentOffsets);

事务模式(数据库操作 + 偏移量提交原子化):

conn.setAutoCommit(false);
processRecordInTransaction(conn, record);
conn.commit();            // 先提交 DB 事务
consumer.commitSync();    // 再提交 Kafka 偏移量

2.6 三种消息传递语义

语义含义实现方式适用场景
At Most Once最多一次,可能丢失acks=0,不重试日志收集、指标监控
At Least Once至少一次,可能重复acks=all + 重试 + 手动提交大多数业务场景
Exactly Once恰好一次,不丢不重幂等生产者 + 事务 API金融交易、计费系统

2.7 Exactly Once 实现(Kafka 0.11+)

生产者端:

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 唯一 ID
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

producer.initTransactions();
producer.beginTransaction();
// ... send messages ...
producer.commitTransaction();  // 失败时 abortTransaction()

消费者端:

consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读已提交事务

消费-处理-生产的原子事务(关键模式):

processorProducer.beginTransaction();
// 1. 处理消息并发送到 output-topic
// 2. 将消费者偏移量绑定到同一事务
processorProducer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
processorProducer.commitTransaction();

2.8 偏移量管理机制

  • Kafka 0.9 之前:偏移量存储在 ZooKeeper
  • Kafka 0.9 之后:存储在内部 Topic __consumer_offsets
  • 提交方式:commitSync()(阻塞)、commitAsync()(非阻塞,通过回调确认)
  • 消费者再均衡(Rebalance)时需通过 ConsumerRebalanceListener 正确处理偏移量提交,否则可能重复消费或丢失

2.9 关键监控指标

指标含义
生产者record-error-rate消息错误率
生产者record-retry-rate消息重试率
BrokerUnderReplicatedPartitions副本同步不足的分区数
BrokerOfflinePartitionsCount离线分区数
消费者records-lag消费延迟(积压消息数)
消费者records-lag-max最大消费延迟

推荐监控组合:Kafka JMX + Prometheus + Grafana


三、优缺点与局限性

acks=all** + **min.insync.replicas=2

  • 优点:强可靠性保证,所有 ISR 副本同步才返回成功
  • 限制:写入延迟上升,吞吐量下降;如果 ISR 副本数低于 min.insync.replicas,生产者会直接报错

幂等性(enable.idempotence=true

  • 优点:自动去重,避免网络重试导致消息写入两次
  • 限制:仅保证单个生产者会话、单个分区内的幂等,不能跨分区/跨会话去重

Exactly Once 事务

  • 优点:端到端不丢不重
  • 限制:需要 Kafka 0.11+;有额外性能开销(约 5-10%);TRANSACTIONAL_ID 必须全局唯一,否则新实例会 Fence 掉旧实例

手动提交偏移量

  • 优点:精确控制,消息处理失败可以不提交
  • 踩坑点:必须保证消息处理幂等,否则 Rebalance 后重复拉取会造成重复消费(而非丢失)

unclean.leader.election.enable=false(默认值)

  • 优点:防止数据截断
  • 限制:所有 ISR 副本都挂掉时,分区会变为不可用,需要人工干预,可用性换可靠性

消息重复 vs 消息丢失

  • 消息重复可通过幂等性设计(唯一 ID、数据库唯一约束、业务状态机)解决
  • 消息丢失一旦发生,数据可能不可恢复,无法确定丢失范围
  • 设计原则:宁可接受重复,不接受丢失

四、行动清单

  1. 配置自查:检查线上 Kafka 集群的 acksreplication.factormin.insync.replicasunclean.leader.election.enable 四个核心参数是否符合生产级标准(建议:acks=all,副本数=3,min.insync.replicas=2unclean=false)。
  2. 消费者代码审计:排查代码中是否存在 enable.auto.commit=true 或”先提交偏移量再处理消息”的反模式,全部改为手动提交且先处理后提交。
  3. 动手练习幂等生产者:用本文代码创建一个开启 enable.idempotence=true 的生产者,验证网络重试场景下不会产生重复消息。
  4. 实现 Exactly Once Demo:按”消费-处理-生产”事务模式,写一个从 input-topic 读取、处理后写入 output-topic 的事务性 Processor,用 sendOffsetsToTransaction 绑定偏移量提交。
  5. 搭建监控面板:在本地或测试环境配置 Kafka JMX → Prometheus → Grafana 链路,重点关注 UnderReplicatedPartitionsrecords-lag 两个告警指标。
  6. 熟悉 LEO/HW 原理:手动模拟 Leader 宕机场景,观察新 Leader 选举后日志截断到 HW 的行为,加深对数据丢失边界的理解。
  7. 面试准备:能够清晰讲述三端(生产者/Broker/消费者)各自的丢失场景和对应解法,以及三种消息语义的实现原理与适用场景。

Kafka 丢数据场景与解决方案

一句话摘要

Kafka 只对已提交的消息最大限度的持久化保证,数据丢失可能发生在 Producer、Broker、Consumer 三端,通过参数组合配置可最大限度规避。


核心知识点

一、消息传递语义

三种语义定义:

语义含义触发场景
at most once消息最多被消费一次,可能丢失Consumer 先提交 Offset 再处理,处理中 crash
at least once消息至少被消费一次,可能重复Producer 重试 / Consumer 先处理再提交,提交前 crash
exactly once消息恰好被消费一次需幂等 + 事务机制配合,难以完全实现

Kafka 默认提供 at least once 语义。

幂等与事务配置

// 启用幂等传递(0.11.0.0+):防止 Producer 重试导致消息重复写入日志
enable.idempotence = true

// 启用事务(保证多 Topic 分区消息要么全部写入成功,要么全部失败)
transactional.id = "指定值"

二、Kafka 不丢数据的前提条件

两个必要条件:

  1. 消息必须是已提交状态:N 个 Broker 成功收到消息并写入日志,告知 Producer 提交成功
  2. 至少 1 个存活的 Broker:N 个 Broker 中至少有 1 个存活,消息才不丢失

三、Producer 端丢失场景与解决方案

丢失场景

消息发送流程:

Producer → 获取 Leader Partition 元数据
         → 发送消息到 Leader Partition
         → Leader 写入 PageCache
         → Follower 拉取同步数据
         → 所有 ISR 副本 ACK → Leader 回复 Producer ACK

Producer 使用异步发送,丢失原因:

  • 网络抖动:消息根本未到达 Broker
  • 消息体过大:超出 Broker 承受范围被拒收
  • acks 配置不当导致提前确认

acks 各值的丢失风险:

acks = 0:发后即忘,网络抖动直接丢,Retries 失效
acks = 1:Leader 写入即确认,Leader Crash 后 Follower 未同步的数据丢失
acks = all:ISR 中只剩 Leader 时,退化为 acks=1,仍可能丢失

解决方案

方案一:使用带回调的发送方式

// 弃用发后即焚
producer.send(msg);

// 改用带回调的方式,发送失败可针对性处理
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 网络抖动 → 重试
        // 消息过大 → 调整后重发
    }
});

方案二:ACK 确认机制配置

# 等待所有 ISR 副本确认
request.required.acks = -1  # 或 all

两种典型 acks=all 场景:

场景1(数据不丢):
  Leader 收到消息 → 所有 ISR 同步完成 → Leader Crash
  → 选举新 Leader → 数据完整不丢失 ✅

场景2(数据重复):
  Leader 收到消息 → 部分 ISR 同步 → Leader Crash
  → Producer 收到失败标识 → 重新发送 → 数据重复 ⚠️
  (需配合幂等处理)

方案三:重试参数配置

# Kafka 2.4+ 默认 Integer.MAX_VALUE
retries = Integer.MAX_VALUE

# 保证消息有序性(每次只允许一个未响应请求)
max.in.flight.requests.per.connection = 1

# 重试时间间隔,避免无效频繁重试,推荐 300ms
retry.backoff.ms = 300

四、Broker 端丢失场景与解决方案

丢失场景

消息写入路径:
  Broker 接收消息 → 写入 PageCache → 操作系统异步刷盘

风险点:
  数据在 PageCache 未刷盘时,Broker 宕机
  + 选举了数据落后很多的 Follower 成为新 Leader
  → 落后的消息数据永久丢失

Kafka 不提供同步刷盘方式,依赖多分区多副本机制保障数据安全。


解决方案

# 禁止落后副本被选举为 Leader(防止数据落后的 Follower 成为 Leader)
unclean.leader.election.enable = false

# 副本数,建议 >= 3
replication.factor = 3

# 消息至少写入多少个 ISR 副本才算已提交,建议 > 1
min.insync.replicas = 2

# 关键约束:必须满足以下关系,否则一个副本 crash 整个分区不可用
replication.factor = min.insync.replicas + 1
# 示例:replication.factor=3, min.insync.replicas=2

五、Consumer 端丢失场景与解决方案

丢失场景

Consumer 消费流程:

Consumer → 获取 Leader Partition 元数据
         → Pull 消息
         → 业务逻辑处理
         → 提交 Offset 到 __consumer_offsets

两种提交顺序的风险:

方式1:先提交 Offset,再处理消息
  → 处理时 crash → 重启后从新 Offset 消费
  → 未处理的消息丢失 ❌(at most once)

方式2:先处理消息,再提交 Offset
  → 提交前 crash → 重启后重新拉取已处理消息
  → 消息重复消费 ⚠️(at least once,需幂等保障)

解决方案

# 关闭自动提交,改为手动提交
enable.auto.commit = false

正确消费流程:

拉取消息 → 业务逻辑处理完成 → 手动提交 Offset

消息重复消费问题由业务层保证幂等性解决(不是 Kafka 负责)。


六、三端丢失场景与解决方案总览

核心丢失原因核心解决手段
Producer异步发送无回调 + acks 配置不当带回调发送 + acks=-1 + retries
BrokerPageCache 未刷盘 + 落后副本选举为 Leaderunclean.leader.election=false + 多副本配置
Consumer先提交 Offset 再处理,处理中 crashenable.auto.commit=false + 手动提交 + 业务幂等

优缺点与局限性

方案优点限制 / 踩坑点
acks=all最强持久性保证ISR 只剩 Leader 时退化为 acks=1,仍可能丢数据
enable.idempotence=true防止 Producer 重试导致重复只解决单分区内幂等,跨分区需用事务
replication.factor=3允许 1 个副本宕机设置与 min.insync.replicas 相等时,任意副本 crash 分区不可用
unclean.leader.election=false防止数据丢失所有 ISR 副本全挂时,分区变不可用,牺牲可用性换一致性
手动提交 Offset精确控制提交时机,不丢消息先处理后提交存在重复消费,业务必须实现幂等
retries=Integer.MAX_VALUE网络抖动时持续重试配合 max.in.flight=1 保证有序,但会降低吞吐量

根本局限: Kafka 不能在任何情况下保证不丢数据,只能对已提交消息做最大限度持久化保证。极端情况(所有副本同时宕机、PageCache 未刷盘)下数据仍然会丢失。


行动清单

  1. 复现 Producer 丢数据场景:用 acks=0 发送消息,模拟网络中断,验证消息确实丢失且无重试
  2. 验证 acks=all 退化场景:将 ISR 缩减为只剩 Leader(kill Follower),观察 acks=all 是否退化为 acks=1
  3. 实现带回调的 Producer:改写现有 producer.send(msg)producer.send(msg, callback),在 callback 中加入日志和重试逻辑
  4. 测试手动提交 Offset:设置 enable.auto.commit=false,分别测试处理前提交和处理后提交两种情况下 crash 的消息行为差异
  5. 参数组合验证:部署一个配置了 replication.factor=3, min.insync.replicas=2, unclean.leader.election=false 的 Topic,依次 kill 1 个、2 个副本,观察分区可用性变化
  6. 幂等消费设计:梳理业务中消费 Kafka 消息的场景,对每个消费逻辑设计幂等方案(唯一键、数据库唯一约束或乐观锁)

Kafka 重复消费与漏消费问题解决方案


一句话摘要

Kafka 的重复消费和漏消费本质是偏移量提交时机问题;通过”先消费后提交偏移量”杜绝漏消费,通过幂等性设计(MySQL 唯一索引 或 Redis 判重)解决重复消费。


核心知识点

1. 推拉模式

Kafka 采用拉模式(Pull),Consumer 主动定时轮询 Broker 拉取消息。

模式优点缺点
推模式(Push)实时性高Consumer 消费速率跟不上时会积压、宕机
拉模式(Pull)Consumer 可自控速率,系统更有柔性存在一定消息延迟

2. 重复消费 & 漏消费的根本原因

两个关键参数:

enable.auto.commit        # 是否自动提交偏移量,默认 true
auto.commit.interval.ms   # 自动提交间隔,默认 5000ms(5秒)

offset 存储位置演进:

  • Kafka < 0.9:offset 存储在 ZooKeeper(高频通信场景下 ZK 有性能瓶颈)
  • Kafka >= 0.9:offset 存储在内置 Topic __consumer_offsets

触发场景分析:

场景提交方式故障时机结果
enable.auto.commit=true自动,每 5s 一次业务处理完,但还没到自动提交时宕机重复消费
enable.auto.commit=false 手动先处理业务,再提交处理完未提交就宕机重复消费
enable.auto.commit=false 手动先提交 offset,再处理业务提交完未处理就宕机漏消费

结论: 纯靠 Kafka 自身无法同时解决两者,需系统层配合。


3. 整体解决策略

分两步走:

  1. 先杜绝漏消费 → 通过参数 + 代码规范
  2. 再解决重复消费 → 通过幂等性设计

4. 杜绝漏消费

原则:无论自动/手动提交,代码中一律遵循「先处理业务逻辑,后提交 offset」的顺序。

  • enable.auto.commit=true(默认):本质是延迟提交,先处理业务再等待自动提交,可接受
  • enable.auto.commit=false(手动):在业务代码处理完成后,再手动调用 commitSync() / commitAsync()

这样做只会引入重复消费的风险,漏消费被彻底杜绝。


5. 解决重复消费:幂等性设计

核心思想: 同一条消息执行一次或多次,最终业务结果相同——依靠全局唯一 ID 实现判重。

方案一:MySQL 唯一索引判重

原理:order_id(全局唯一 ID)建唯一索引,消费时执行 INSERT,若已存在则被唯一索引拦截,返回错误,不重复处理。

-- 示例:order_id 字段建唯一索引
CREATE UNIQUE INDEX idx_order_id ON your_table(order_id);

-- 消费时执行 insert,重复则抛异常,捕获处理即可
INSERT INTO your_table (order_id, ...) VALUES (?, ...);

优点: 开发成本低,仅加一个唯一索引 + 处理冲突异常。

缺点: 唯一索引不能使用 Change Buffer(写缓冲),导致 InnoDB 磁盘随机 IO 增加,数据库写入性能下降。

Change Buffer 原理补充: 对于非唯一索引,写操作若目标 Index Page 不在 Buffer Pool 中,可先缓存到 Change Buffer,由”页面被访问 / Master Thread 定期执行 / 数据库关闭”三个时机触发合并写入,减少随机 IO。唯一索引必须即时校验唯一性,无法延迟,因此不能用此优化。


方案二:Redis 唯一 Key + 过期时间判重

原理: 消费前用全局唯一 ID 到 Redis 中查找/写入 Key,Key 存在则跳过,不存在则处理并写入 Key。

步骤:
1. 从消息中提取全局唯一 ID
2. 执行 SET key value EX <过期时间> NX  (原子操作)
3. 返回成功 → 首次消费,执行业务逻辑
4. 返回失败 → 重复消息,直接丢弃

为什么可以设置过期时间: 全局唯一 ID 通常含毫秒级时间序列,时间相隔较久的 ID 不会重复,可放心淘汰旧 Key 释放内存。

优点: 为数据库减压,将判重前置到 Redis,开发成本低。

缺点 / 踩坑点:

  • Redis 挂掉时缺乏可靠的 Plan B:
    • 容忍重复消费?→ 业务可能不允许
    • 降级到数据库唯一索引判重?→ Redis 层的价值失效
    • 改用 SELECT ... FOR UPDATE 加锁判重?→ 操作更重,性能更差
  • Plan B 方案强依赖具体业务特性,没有通用答案。

优缺点与局限性汇总

方案适用场景限制 / 踩坑点
先处理后提交 offset所有场景,基础规范仍存在重复消费风险,需配合幂等性
MySQL 唯一索引写入量不大、对数据强一致性要求高唯一索引禁用 Change Buffer,高并发写入时 IO 压力大
Redis 判重高并发、写入量大、对延迟敏感Redis 单点故障时 Plan B 复杂,需结合业务设计降级策略

行动清单

  1. 验证参数默认值:检查现有消费者配置中 enable.auto.commitauto.commit.interval.ms 的实际值,评估当前是否存在风险。
  2. 代码规范落地:在所有消费者代码中 Review 业务处理与 offset 提交的顺序,确保”先处理,后提交”。
  3. 验证 offset 存储位置:确认 Kafka 版本 >= 0.9,查看 __consumer_offsets Topic 是否正常,避免对 ZooKeeper 的误判。
  4. 选择幂等方案:根据业务写入 QPS 决策——低频写入选 MySQL 唯一索引;高频写入优先选 Redis 判重,并提前设计 Redis 故障降级方案。
  5. 实践 Redis 原子判重:练习使用 SET key value EX <seconds> NX 命令实现原子性写入,避免先 GET 再 SET 的竞态条件。
  6. 深入 Change Buffer:阅读 InnoDB Change Buffer 源码或官方文档,理解其触发合并的三个时机,从底层理解为何唯一索引不可用。
  7. 压测对比:在测试环境对 MySQL 唯一索引方案做高并发写入压测,量化随机 IO 对 TPS 的影响,为方案选型提供数据支撑。

Kafka 消费者:重复消费 / 顺序消费 / 延迟消费 / Rebalance 全解


一句话摘要

Kafka 消费端的核心难题是重复消费,根本原因是 offset 未能及时提交(由 Rebalance 或消费过慢触发);解决路径是提速 + 幂等兜底,同时合理配置 Rebalance 相关参数以降低触发频率。


二、核心知识点

1. 消费者消费流程

消费流程分四步:

  1. 从 ZooKeeper 获取目标 partition 的 leader 位置和 offset 位置。
  2. 直接从 Broker 的 PageCache 拉取数据(零拷贝,速度快)。
  3. 若 PageCache 数据不全,则从磁盘补充拉取。
  4. 消费完成后提交 offset(可手动或自动)。

2. 三种分区分配(Partition Assignment)策略

策略原理适用场景
Range(默认)按分区数/消费者数均分,余数分摊给前几个消费者通用默认
RoundRobin对 topic 和 partition 的 hashcode 排序后轮询分配多 topic 均衡消费
Sticky无 Rebalance 时同轮询;发生 Rebalance 时尽量保持上一次分配不变减少 Rebalance 开销

计算示例(Range):12 个 partition,9 个消费者 → 12/9=112%9=3,前 3 台各消费 2 个,后 6 台各消费 1 个。

配置代码

// Range(默认)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class);
// RoundRobin
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class);
// Sticky
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class);

3. 零拷贝(Zero Copy)

普通文件传输需 4 次拷贝:磁盘 → 内核缓冲区 → 用户空间 → Socket Buffer → 网卡,其中内核↔用户空间的两次互转带来额外 CPU 上下文切换开销。

零拷贝跳过用户空间,通过 DMA(Direct Memory Access) 直接将数据从内核传递到网卡,减少不必要的拷贝次数。

实现方式:

  • Linux:sendfile() 系统调用
  • Java NIO:FileChannel.transferTo()
  • mmap 文件映射:将磁盘文件映射到内存,用户通过修改内存直接修改磁盘文件,提高 I/O 效率。

4. 重复消费的原因与解决方案

原因一:生产者重复提交(producer 重试机制导致同一消息发送多次)

原因二:Rebalance 引起重复消费
超过 max.poll.interval.ms(默认 5 分钟)未 poll,客户端主动离开消费组触发 Rebalance,offset 提交失败,其他消费者从上次已提交的 offset 处重新消费。

解决方案:

① 提高消费速度(治标)

  • 增加消费者数量(不超过 partition 数)
  • 多线程并行消费
  • 异步消费
  • 优化消费处理逻辑,缩短单条耗时

② 幂等处理(治本)

  • 消费者侧做幂等校验(推荐):消费前检查该消息是否已处理(如查 Redis/DB)
  • 开启 Kafka 生产者幂等:将消息生成 MD5,存入 Redis,消费时先校验(性能消耗较大,不建议开启)
// 开启生产者幂等(谨慎使用)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

5. 顺序消费

Kafka 保证单个 partition 内消息有序,跨 partition 不保证顺序。实现顺序消费的核心是:将需要保序的消息发送到同一个 partition(通过指定相同的 key)。


6. 延迟消费

Kafka 本身无状态,原生不支持延迟消费(RabbitMQ、Pulsar 原生支持更方便)。

实现方案:开发延迟推送服务,定时扫描延迟消息,到时间后再投递给 Kafka。


7. 频繁 Rebalance 的解决方案

触发条件:

  1. consumer 数量变化
  2. 订阅的 topic 数量变化
  3. 订阅的 topic 的 partition 数量变化

Rebalance 期间所有消费者停止消费,直到均衡完成,对业务影响大。

解决方案:

① 参数调整

  • session.timeout.ms:v0.10.2 之前版本适当提高,需大于一批数据的消费时间,不超过 30s(建议设 25s);v0.10.2 及之后保持默认。
  • max.poll.interval.ms:设置为大于实际消费一批数据所需时间的值,避免消费超时触发 Rebalance。

② 提高消费速度:消费逻辑另起线程异步处理,poll 主循环快速返回。

③ 减少 Group 订阅 Topic 数量:一个 Group 订阅的 Topic 最好不超过 5 个,建议 1 个 Group 只订阅 1 个 Topic。


8. 批量消费配置(Spring Kafka)

@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
    factory.setConcurrency(3);                        // 并发消费者数量
    factory.getContainerProperties().setPollTimeout(3000);
    factory.setBatchListener(true);                   // 开启批量消费
    return factory;
}

关键消费者配置参数:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 关闭自动提交,手动控制
props.put(ConsumerConfig.GROUP_ID_CONFIG, ...);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);      // 每批最大拉取条数

三、优缺点与局限性

Range 策略

  • 适用:分区数能被消费者数整除时分配均匀;否则前几个消费者会多消费 1 个,多 topic 场景下头部消费者负担明显更重。

Sticky 策略

  • 优点:Rebalance 后尽量复用原有分配,减少迁移开销。
  • 缺点:实现复杂,首次分配与 RoundRobin 相同,优势只在 Rebalance 时体现。

生产者幂等(ENABLE_IDEMPOTENCE_CONFIG=true

  • 只能解决单 producer 同 session 内的重复,跨重启无效。
  • 性能开销较大,原文明确建议”尽量不要开启”。

消费者侧幂等(Redis/DB 去重)

  • 最通用的解法,但需要额外存储,并发场景要注意分布式锁或原子操作。

延迟消费

  • Kafka 原生不支持,自研轮询服务存在精度问题(取决于轮询间隔),高精度延迟推荐用 RabbitMQ 或 Pulsar。

批量消费

  • MAX_POLL_RECORDS_CONFIG 设置过大,单批处理时间超过 max.poll.interval.ms 则触发 Rebalance,需根据业务消费耗时反向估算合理批大小。

四、行动清单

  1. 实验 Rebalance 参数:在测试环境故意让消费者超时,观察 max.poll.interval.ms 不同值对 Rebalance 触发频率的影响。
  2. 实现消费幂等:用 Redis SET NX(或 SETNX)存储消息唯一 ID,消费前校验,保证 at-most-once 处理。
  3. 压测批量消费:调整 MAX_POLL_RECORDS_CONFIG(如 100 / 500 / 1000),对比吞吐量和消费延迟,找到业务的最优批大小。
  4. 顺序消费实践:用相同 key 将同一业务实体的消息固定路由到同一 partition,验证有序性。
  5. 对比分区策略:在多 topic 场景下,切换 Range → RoundRobin,观察各消费者负载是否更均衡。
  6. 延伸阅读:Pulsar 的延迟消息实现原理,对比 Kafka 自研轮询方案的精度和复杂度差距。
  7. 踩坑清单核查:排查现有项目中是否存在 ENABLE_AUTO_COMMIT_CONFIG=true + 业务处理耗时长的组合,这是线上最常见的重复消费根因。

Kafka 集群消息积压问题及处理策略


一句话摘要

Kafka 消息积压由三类根因引发(消费任务挂掉、分区数不足、key 分布不均),针对每类根因有对应的恢复策略和预防手段。


核心知识点

1. 正常不积压的前提条件

满足以下全部条件时,Kafka 通常不会产生积压:

  • Producer 采用轮询或随机方式生产,保证各分区数据均匀分布。
  • 根据 topic 数据量合理设计分区数。
  • 消费端(Spark Streaming / Structured Streaming / Flink)持续稳定运行,不存在长时间中断。

上述任一条件被破坏,积压就不可避免。


2. 积压根因一:消费任务挂掉

问题描述:实时消费应用因异常退出,同时缺少监控告警和自动重启机制,导致积压在任务挂掉到重新启动之间持续累积。

解决方案 A(跳过积压,补漏)

  • 重启后直接从最新 offset 消费,优先恢复实时链路。
  • 对”滞后”的历史积压数据,启动离线补漏程序单独处理。

解决方案 B(从断点续消)

  • 任务启动时从上次已提交的 offset 处继续消费。
  • 若积压量大,需同步增加计算资源,尽快追上最新消息。

配套工程实践

  • 将任务接入监控体系,出现异常及时通知负责人。
  • 编写自动重启脚本,确保任务可被自动拉起。
  • 实时框架需具备健壮的异常处理能力,避免脏数据导致任务无法重启。

3. 积压根因二:分区数过少 / 消费能力不足

问题描述:Kafka 分区数是并行度调优的最小单元。分区数过少时,consumer 无法水平扩展,吞吐量上限受限。单分区生产 QPS 通常很高,一旦业务逻辑复杂导致消费耗时增加,积压就会出现。

解决方案

  • 直接增加 Kafka 分区数,提升并行消费能力。
  • 若使用 Spark Streaming + Kafka Direct Approach 模式,可在 Spark 侧对 KafkaRDD 执行 repartition 重分区,增加并行处理度,无需修改 Kafka 配置。

4. 积压根因三:消息 key 不均匀导致分区数据倾斜

问题描述:Kafka producer 指定了消息 key 时,相同 key 的消息会路由到同一分区。若 key 分布不均匀,部分分区数据量远超其他分区,形成热点,导致局部积压。

解决方案

  • 在 Kafka producer 端,对 key 追加随机后缀,使消息均匀打散到各分区。

优缺点与局限性

场景方案适用条件限制 / 踩坑点
任务挂掉-跳过积压重启消费最新消息 + 离线补漏实时链路恢复优先,可接受一定处理延迟需要单独开发离线补漏程序,复杂度增加
任务挂掉-断点续消从上次 offset 继续消费对数据完整性要求高、不允许丢失积压量大时追消费速度慢,需扩容资源才能追上
分区数不足增加 Kafka 分区数数据量持续增大的长期方案分区数只能增加不能减少;修改后需要重新平衡消费者分配
Spark Direct 模式KafkaRDD.repartition不想修改 Kafka 端配置时的临时方案repartition 会引入 shuffle,有额外性能开销
key 不均匀key 加随机后缀producer 侧可改造加随机后缀后,相同业务 key 的消息会分散到不同分区,破坏顺序消费语义,需评估业务是否依赖消息顺序

行动清单

  1. 排查现有消费任务是否有监控:确认 Spark Streaming / Flink 任务已接入告警系统,并配置自动重启策略(如 Supervisor / Kubernetes restartPolicy)。
  2. 评估当前 Kafka 分区数是否合理:根据 producer QPS 和 consumer 单分区处理能力计算最优分区数,公式参考:分区数 = max(producer峰值QPS / 单分区写入上限, consumer并发数)
  3. 检查 producer key 的分布情况:通过 Kafka 自带命令查看各分区消息积压量(kafka-consumer-groups.sh --describe),若存在严重倾斜则排查 key 设计。
  4. 制定积压应急预案:明确业务是选择”实时优先+离线补漏”还是”断点续消+扩容追赶”,提前编写对应脚本。
  5. Spark Direct 模式下测试 repartition 效果:在测试环境验证 KafkaRDD.repartition(n) 的 shuffle 开销是否在可接受范围内,再推广到生产。
  6. 延伸学习:深入研究 Kafka Consumer Group Rebalance 机制,理解分区再分配对积压的影响,以及如何通过 max.poll.recordsfetch.min.bytes 等参数调优消费吞吐量。

⚠️ 注意:该 CSDN 文章为 VIP 付费内容,正文在”具体操作2”处被截断,目录也仅展示了”场景1 + 解决方案1”。以下笔记仅基于页面实际可读内容整理,未能覆盖文章完整内容。


Kafka 消息积压:典型场景及解决方案

一句话摘要

Kafka 消息积压的核心矛盾是生产速度 > 消费速度,本文针对”消费任务挂掉”这一典型场景,给出了”跳过历史直接消费最新 + 离线补漏”的组合解法。


核心知识点

1. 消息积压的定义

Kafka 消息积压(Message Backlog)指 Broker 中存在大量尚未被消费的消息。根本原因是生产者写入速度持续超过消费者消费速度,或消费者出现故障停止消费。


2. 场景一:消费/实时任务挂掉

问题描述:实时消费应用异常退出,且无监控告警、无自动重启脚本,导致任务挂掉期间的消息全部堆积。重启应用后,若历史积压量过大,无法简单重启直接追消完。

解决思路:将”追历史积压”与”消费最新消息”拆成两条独立的处理链路,互不干扰。


操作一:让消费者跳过积压,直接从最新消息开始消费

有以下四种方式,任选其一:

方式 1:配置参数

auto.offset.reset = latest

消费者启动时若无已提交偏移量,则从最新位置开始消费。

方式 2:动态消费组 ID

每次重启消费者时生成一个随机 groupId,使 Kafka 视其为全新消费组,从而自动从 latest 开始消费:

// 示例:Java 中动态设置 group.id
props.put("group.id", "realtime-consumer-" + UUID.randomUUID().toString());

方式 3:手动重置偏移量到最新位置

通过 Kafka Consumer API 编程方式将偏移量设置到分区末尾:

consumer.seekToEnd(consumer.assignment());

方式 4:偏移量默认存储位置

Kafka 较早版本偏移量默认存储在 ZooKeeper,新版本存储在 Kafka 内部 Topic __consumer_offsets,手动重置时需注意版本差异。


操作二:对滞后历史数据进行离线补漏

(原文此部分被付费墙截断,未能获取具体内容)

核心思路是通过离线批处理程序,按时间范围重放积压期间的消息,与实时链路相互独立,防止历史数据影响实时消费链路的延迟。


优缺点与局限性

方案适用场景局限 / 踩坑点
auto.offset.reset=latest新部署或允许丢弃历史数据的场景一旦历史数据有业务价值,直接跳过会造成数据丢失,必须配合离线补漏
动态 groupId需要每次重启都从 latest 开始的场景动态 groupId 会导致 Kafka 永远不会清理旧 group 的偏移量元数据,积累垃圾 group;生产慎用
seekToEnd 手动设置精细化控制偏移量的场景需要保证调用时机正确(assign 之后),否则 assignment 为空会静默失败
离线补漏历史数据量大、业务上允许延迟处理的场景需要额外维护一套离线消费链路,增加系统复杂度;要做好幂等处理,防止与实时链路重复消费同一条消息

行动清单

  1. 搭建监控:优先为 Kafka Consumer Lag 指标配置告警,推荐使用 Kafka Exporter + Prometheus + Grafana,当 consumer_lag > 阈值 时触发告警,防止积压无人知晓。
  2. 配置自动拉起:用 Supervisor、Systemd 或 K8s Deployment 管理消费任务进程,确保任务崩溃后自动重启。
  3. 实践偏移量控制:本地搭建 Kafka 环境,分别验证 auto.offset.reset=earliest/latestseekToBeginningseekToEnd 三种偏移量控制方式的行为差异。
  4. 阅读补充资料:该文章付费内容未能读取,可参考以下同类公开资源补充其他积压场景(消费速度慢、分区数不足、下游处理瓶颈):
    • Kafka 官方文档:Consumer Configuration 章节
    • kafka-consumer-groups.sh --describe 命令:用于查看 Lag 详情
    • 增加 Topic Partition 数 + 同步扩容 Consumer 数量(Consumer 数 = Partition 数时并行度最优)
  5. 设计离线补漏方案:制定补漏方案时,需明确幂等消费策略(如消息 ID 去重),避免实时和离线链路双写冲突。

Kafka 高可用、顺序消费及幂等性


一句话摘要

Kafka 集群在生产环境中面临消息丢失、重复消费、乱序、积压四大核心问题,通过副本机制、ISR 选举、幂等性方案和分区策略可系统性解决。


二、核心知识点

1. 搭建 Kafka 集群(3-Broker 示例)

每个 Broker 通过唯一 broker.id 和独立监听端口区分身份,启动后向 ZooKeeper 注册临时节点。

server.properties 核心配置示例:

# Broker 3 示例
broker.id = 3
listeners=PLAINTEXT://172.16.30.34:49094

验证集群启动成功:

# 进入 ZooKeeper 客户端
./bin/zkCli.sh
# 查看已注册的 Broker 节点
ls /brokers/ids
# 输出:[1, 2, 3] 说明 3 个节点全部在线

向集群发送和消费消息:

# 生产者发送消息(指定多个 Broker 地址)
./kafka-console-producer.sh \
  --broker-list 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
  --topic my-replicated-topic

# 消费者消费消息
./kafka-console-consumer.sh \
  --bootstrap-server 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
  --topic my-replicated-topic

# 指定消费组消费(从头开始)
./kafka-console-consumer.sh \
  --bootstrap-server 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
  --from-beginning \
  --consumer-property group.id=testGroup1 \
  --topic my-replicated-topic

2. 集群关键角色

Controller(控制器)

  • 每个 Broker 启动时在 ZooKeeper 创建临时序号节点,序号最小的 Broker 成为 Controller。
  • 职责:负责 Leader 选举、Broker 增减时的元数据同步、分区变更时的信息广播。

Leader 选举规则(ISR 机制)

  • ISR(In-Sync Replica)集合维护与 Leader 保持同步的副本列表。
  • 选举时取 ISR 集合最左边的元素作为新 Leader。
  • 示例:ISR = [2, 1, 3],Leader(2) 宕机 → ISR 变为 [1, 3] → Broker-1 上的副本成为新 Leader。

HW 与 LEO

指标含义规则
LEO(Log End Offset)某副本最后一条消息的位移各副本独立维护
HW(High Watermark,高水位)Leader-Follower 完成同步的位置消费者只能消费 HW 之前的消息;HW ≤ LEO(木桶效应)

3. Rebalance 机制

触发条件: 消费组中消费者未指定分区 + 消费者数量或分区关系发生变化。

三种分配策略(可配置):

  • range:前段消费者分得 分区总数/消费者数量 + 1 个,后段分得 分区总数/消费者数量 个。
  • 轮询(round-robin):各消费者轮流分配分区。
  • sticky(粘合策略):Rebalance 时在原有分配基础上最小化调整,不重置已有分配关系。

4. 防止消息丢失

生产者侧:

  • 使用同步发送(非 fire-and-forget)。
  • ack = 1(全同步)。
  • 同步副本数 ≥ 2。

消费者侧:

  • 关闭自动提交 offset,改为手动提交。
  • 消息完整消费并处理完毕后,再执行 ack 应答。

5. 防止重复消费

根因: 网络抖动导致生产者未收到 ack,触发重试,消费者收到两条相同消息。

不推荐方案: 生产者关闭重试机制(会引发消息丢失)。

推荐方案:消费者侧幂等性保证

  • 方案 A:数据库唯一索引(消息 ID 作为唯一键,重复插入直接忽略)。
  • 方案 B:Redis 分布式锁(消费前先 SET key NX,已存在则跳过处理)。

6. 顺序消费

约束条件(严格限制):

  • 生产者:同步发送,ack = 1
  • 消费者:Topic 只设置 1 个 Partition,消费组内只有 1 个消费者

顺序消费直接牺牲水平扩展能力,吞吐量极低,仅在强顺序业务场景(如订单状态流转)下使用。


7. 解决消息积压

成因: 消费速度持续低于生产速度,offset 寻址开销随积压量增长,最终引发服务雪崩。

解决方案:

  • 消费者内部启用多线程并发消费,充分利用单机 CPU。
  • 在同一消费组中横向扩展消费者实例,部署到多台机器并行消费(消费者数量 ≤ 分区数)。

8. 延时队列实现

典型场景: 未支付订单 30 分钟后自动取消。

实现方案:

  1. 创建专用 Topic,生产者发送消息时携带创建时间戳
  2. 消费者消费时判断:当前时间 - 创建时间 ≥ 30min
    • 是 → 修改订单状态为「超时取消」。
    • 否 → 记录当前 offset,等待 1 分钟,再重新拉取该 offset 的消息继续判断,循环直到超时或支付完成。

三、优缺点与局限性

场景优势限制 / 踩坑点
高可用副本节点宕机自动切换 Leader,无需人工干预ISR 列表为空时(全副本落后),选举可能选到脏数据副本
幂等性去重消费者侧控制,无需改动生产者逻辑Redis 分布式锁存在锁过期后重复消费的边界问题
顺序消费严格保证全局有序单分区单消费者,吞吐量近乎为零,不可用于高并发场景
Rebalance sticky减少分区迁移开销需显式配置,默认策略为 range,老集群升级需注意兼容性
延时队列轮询方案无需引入额外中间件(如 RabbitMQ 死信队列)1 分钟轮询粒度不够精细;高频轮询对 Kafka 造成额外读压力
消息积压扩容水平扩展简单有效消费者数量超过分区数后,多余消费者闲置,扩容前需同步扩分区

四、行动清单

  1. 环境实践:
    1. 本地用 Docker 启动 3-Broker Kafka 集群,执行文中 zkCli.sh 命令验证节点注册。
    2. 创建一个 replication-factor=3 的 Topic,手动 kill 掉 Leader Broker,观察 ISR 切换过程。
  2. 代码实践:
    1. 用 Spring Kafka 实现消费者手动提交 offset,对比自动提交与手动提交在消费失败时的行为差异。
    2. 实现基于 Redis SET NX 的幂等性消费拦截器,模拟网络重试场景验证去重效果。
    3. 实现延时队列 Demo:下单时发消息带时间戳,消费者轮询判断是否超时取消。
  3. 深入学习:
    1. 深入研究 HW/LEO 同步流程(Leader Epoch 机制),理解 HW 截断如何防止数据不一致。
    2. 对比 Kafka 延时队列方案 vs RabbitMQ 死信队列 vs 时间轮(HashedWheelTimer)的适用场景。
    3. 研究 sticky 分配策略的具体算法,以及在大规模消费组(50+ 消费者)下的性能表现。

注: 该页面正文后半部分为付费内容,以下笔记基于页面已公开的完整核心内容(回答重点 + 概述)整理。


Kafka 批量消息发送与消费及性能优化

一句话摘要

通过配置 Producer 的 linger.ms / batch.size 和 Consumer 的 max.poll.records,Kafka 可将多条消息合并批次收发,显著降低网络往返和 I/O 开销,是提升系统吞吐量的核心手段。


核心知识点

1. 批量消息发送

原理: Producer 不会每次调用 send() 就立即发网络包,而是将发往同一分区的消息暂存到内存缓冲区(RecordAccumulator),满足任一条件时触发真正发送:

  • 缓冲区大小达到 batch.size
  • 等待时间超过 linger.ms

关键参数:

参数作用默认值
batch.size每个批次的最大字节数,达到后立即发送16384(16KB)
linger.ms批次未满时最长等待时间(ms),增大可提升吞吐、轻微增加延迟0
acks确认模式,all 表示所有副本确认,可靠性最高1

示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 5);          // 等待5ms凑批
props.put("batch.size", 16384);     // 批次上限16KB
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
    producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();

2. 批量消息消费

原理: Consumer 调用 poll() 时,一次性从 Broker 拉取多条消息,max.poll.records 限制单次拉取的最大条数,避免单次处理量过大。

关键参数:

参数作用示例值
max.poll.records每次 poll() 返回的最大消息条数500
enable.auto.commit是否自动提交 offset,批量场景建议关闭改为手动提交false

示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");   // 禁用自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.records", 500);         // 每次最多拉500条

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n",
                record.offset(), record.key(), record.value());
    }
    consumer.commitSync();   // 处理完整批次后手动同步提交
}

3. 批量操作的性能优化方向

批量处理的收益来自减少以下开销:

  • 网络往返(RTT):N 条消息合并为 1 次网络请求
  • I/O 操作:Broker 顺序写磁盘效率更高
  • 系统调用次数:减少 send/recv syscall 频率

常见调优参数组合:

  • Producer 高吞吐:linger.ms=5~20batch.size=65536(64KB),compression.type=lz4/snappy
  • Consumer 高吞吐:max.poll.records=500~1000fetch.min.bytes 适当增大,fetch.max.wait.ms 配合调整

优缺点与局限性

批量发送(Producer)

  • 适用场景:日志收集、埋点上报、非实时业务数据写入
  • 局限性:linger.ms 增大会引入固定延迟,实时性敏感业务(如支付、报警)不适合
  • 踩坑点:batch.size 设置过小等于没有批量效果;消息体超过 batch.size 时单条直接发送,不受批量约束

批量消费(Consumer)

  • 适用场景:数据库批量写入、离线分析、ETL 处理
  • 局限性:max.poll.records 设置过大,若单批处理时间超过 max.poll.interval.ms(默认5分钟),Consumer 会被踢出消费组,触发 Rebalance
  • 踩坑点1:使用 enable.auto.commit=true 时,批次中部分消息处理失败但 offset 已提交,会造成消息丢失
  • 踩坑点2:commitSync() 放在 for 循环内部(逐条提交)等于失去批量提交的意义,应放在整个批次处理完成后

通用局限性

  • 批量大小与延迟是互相制约的 trade-off,没有通用最优值,需根据业务 SLA 测压调整
  • 消费端批量处理失败时,重试整个批次还是跳过错误消息,需要明确业务策略

行动清单

  1. 动手验证参数效果: 本地搭建单节点 Kafka,用示例代码分别测试 linger.ms=0 vs linger.ms=10,用 JMX 或 kafka-producer-perf-test.sh 对比吞吐量差异
  2. 压测 Consumer 批量大小:max.poll.records 从 50 逐步调到 1000,观察处理延迟与吞吐量变化,找到业务场景的平衡点
  3. 手动 commit 实践: 将示例消费代码改为批次内部处理异常时不提交 offset,验证消息不丢失的语义
  4. 深入参数体系: 继续研究与批量密切相关的参数:buffer.memory(Producer 缓冲区总大小)、fetch.min.bytes / fetch.max.wait.ms(Consumer 拉取策略)、compression.type(批量压缩)
  5. 面试答题框架: 回答此类题目时按”实现手段(参数)→ 代码示例 → 性能收益原理 → 踩坑/局限”四段组织,覆盖 Producer 和 Consumer 两侧