Kafka 入门学习
一句话摘要
Kafka 是基于发布订阅模式的分布式消息流平台,通过分区、副本、消费者组等机制实现高吞吐、低延迟、高可用的消息传递;本文系统梳理了其核心概念、集群配置参数、Producer/Consumer API 及偏移量管理。
核心知识点
1. 基本术语体系
消息(Message/Record):Kafka 中的基本数据单元,等价于数据库表中的一行记录。
批次(Batch):为提高效率,消息分批写入 Kafka,一个批次代表一组消息。
主题(Topic):消息的分类标签,等价于数据库表。
分区(Partition):Topic 被分为多个 Partition,可分布在不同 Broker 上,实现水平扩展。单个分区内有序,跨分区无序。
偏移量(Consumer Offset):不断递增的整数元数据,记录消费者重平衡时的消费位置,用于恢复消费进度。
Broker:一台 Kafka 服务器。接收生产者消息,为消息设置偏移量,持久化到磁盘。集群中有一个 Broker 担任控制器角色,由活跃成员自动选举产生。
副本(Replica):消息的备份。分为 Leader Replica(对外提供服务)和 Follower Replica(被动同步)两类。
重平衡(Rebalance):消费者组内某成员宕机时,其他消费者自动重新分配分区的过程,是消费者端高可用的核心手段。
2. Kafka 设计特性
- 高吞吐低延迟:每秒处理数十万条消息,最低延迟仅几毫秒。
- 高伸缩性:分区可分布在不同 Broker,通过增加节点扩展吞吐量。
- 持久性与可靠性:消息持久化到磁盘,支持副本备份。
- 容错性:允许集群中的节点失败,集群仍能正常运行。
- 高并发:支持数千个客户端同时读写。
3. Kafka 高性能的底层原理
Kafka 速度快的核心原因有四点:
- 顺序读写:消息追加写入磁盘,避免随机寻址。
- 零拷贝(Zero-Copy):数据在内核态直接从磁盘传输到网络,避免内核与用户空间的数据拷贝切换。
- 消息压缩:批量数据压缩,减少 I/O 传输量。
- 分批发送:从 Producer 到文件系统再到 Consumer,全链路以批次为单位传输数据。
4. 四大核心 API
- Producer API:向一个或多个 Topic 发送消息记录。
- Consumer API:订阅一个或多个 Topic 并处理消息流。
- Streams API:流处理器,消费输入流并生成输出流(Topic → Topic 转换)。
- Connector API:连接 Kafka Topic 与外部系统,如关系型数据库的 CDC 连接器。
5. Broker 端重要配置
| 参数 | 说明 | 默认值/示例 |
|---|---|---|
broker.id | Broker 唯一标识,集群内不可重复 | 默认 0 |
port | 监听端口,1024 以下需 root 权限 | 默认 9092 |
zookeeper.connect | ZK 地址,格式 zk1:2181,zk2:2181/kafka1 | 无默认,必填 |
log.dirs | 消息日志存储路径,多路径逗号分隔,如 /home/kafka1,/home/kafka2 | 无默认,必填 |
num.recovery.threads.per.data.dir | 每个日志目录的恢复线程数,崩溃重启时并行恢复可大幅缩短时间 | 默认 1 |
auto.create.topics.enable | **线上强烈建议设为 **false,否则任何写/读/元数据请求都会自动建 Topic,产生大量垃圾 Topic | 默认 true |
主题级默认配置:
| 参数 | 说明 | 默认值 |
|---|---|---|
num.partitions | 自动创建 Topic 时的默认分区数,只能增不能减 | 1 |
default.replication.factor | 副本数 | 1 |
log.retention.hours / log.retention.ms | 消息保留时长,推荐用 ms 精度的版本 | 168h(1 周) |
log.retention.bytes | 按大小保留,作用于每个分区。8 分区 + 1GB = 最多保留 8GB | -1(无限) |
log.segment.bytes | 单个日志片段上限,到达后关闭当前片段、打开新片段 | 1GB |
message.max.bytes | Broker 能接收的单条消息最大值 | 需与消费端参数匹配 |
JVM 推荐配置:
- JDK 版本:推荐 JDK 1.8。
- 堆大小:直接设置为
6GB以避免常见 Bug。 - GC:Java 8 直接使用默认 G1,关键参数:
MaxGCPauseMillis:每次 GC 停顿目标,默认200ms。InitiatingHeapOccupancyPercent:堆使用率超过该值触发 GC,默认45%。
6. Kafka Producer
创建生产者必填三项:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092"); // 建议≥2个,保证HA
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
三种消息发送方式:
- Fire-and-Forget(直接发送,不关心结果):调用
send()不处理返回值,不知道消息是否成功,适合允许丢失的场景。 - 同步发送:调用
send().get()阻塞等待,服务器出错时抛出异常。生产者内部有两类错误:可重试错误(如连接断开、无主分区)会自动重试;不可重试错误(如消息过大)直接抛出。 - 异步发送(带回调):传入
Callback对象,实现onCompletion()接口,错误时exception非空:
producer.send(record, (metadata, exception) -> {
if (exception != null) { /* 处理错误 */ }
});
分区策略:
- 轮询(Round-Robin):默认策略,均匀分配到各分区,吞吐最优。
- 随机(Random):旧版本策略,新版已改为轮询,不推荐。
- Key 路由(key-ordering):相同 Key 的消息始终进入同一分区,保证分区内顺序性:
// 两行代码实现 key-based 分区
int numPartitions = cluster.partitionCountForTopic(topic);
return key.hashCode() % numPartitions;
- 自定义分区:实现
org.apache.kafka.clients.producer.Partitioner接口,配置Partitioner.class参数。Partitioner包含三个方法:partition()(核心逻辑)、close()、onNewBatch()。
压缩机制:
压缩在 Producer 侧开启,Consumer 侧自动解压,压缩类型随消息头传递:
props.put("compression.type", "gzip"); // 可选:snappy、gzip、lz4
Kafka 以消息集合(Message Set)为单位进行压缩,不对单条消息压缩,批量压缩效率更高。
Producer 重要参数:
| 参数 | 说明 |
|---|---|
acks | 0=不等待确认(UDP模式);1=Leader 确认即可;all=所有副本确认,最安全但延迟最高 |
buffer.memory | 发送缓冲区大小,超出时 send() 阻塞或抛异常 |
retries | 临时错误重试次数,每次重试间隔由 retry.backoff.ms 控制(默认 100ms) |
batch.size | 同一分区消息的批次内存上限,填满即发;不一定等满才发 |
max.in.flight.requests.per.connection | 服务器响应前可发送的最大消息数,设为 1 可保证严格顺序写入 |
compression.type | 压缩算法,默认不压缩 |
request.timeout.ms | 等待服务器响应的超时时间 |
max.block.ms | 缓冲区满或无元数据时 send() 的最大阻塞时间,超时抛 TimeoutException |
max.request.size | 单次请求(或单条消息)的最大大小 |
7. Kafka Consumer
消费者组(Consumer Group)核心机制:
- 组内消费者共享
Group ID,共同消费一个 Topic。 - 每个分区只能被组内一个消费者消费,多余消费者闲置。
- 消费者数量不应超过分区数,超出部分完全空闲。
- 水平扩展消费能力:向消费者组增加消费者(不超过分区数)。
两种消费模式:
- 点对点(消息队列):一个消费者组消费一个 Topic,消息被消费后即消失。
- 发布-订阅:多个消费者组消费同一 Topic,每个组都能收到全量消息。若要应用读取全量消息,需为该应用单独创建消费者组。
重平衡(Rebalance):
- 触发时机:消费者加入/离开组、消费者宕机(心跳超时)、Topic 分区数变化。
- 重平衡期间消费者组完全不可用(类比 JVM 的 Stop-The-World)。
- 消费者通过定期向 Group Coordinator(Kafka Broker)发送心跳维持成员资格。
session.timeout.ms(默认 3s)内未发心跳即被判定死亡,触发重平衡。- 重平衡是 Kafka 消费端的已知 Bug 级缺陷,社区至今无法完全修复其性能问题。
创建消费者示例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092");
props.put("group.id", "myGroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题(支持正则,新建匹配Topic会立即触发重平衡)
consumer.subscribe(Collections.singletonList("customerTopic"));
// 订阅所有test相关主题:consumer.subscribe(Pattern.compile("test.*"));
// 轮询拉取
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息...
}
}
// 关闭时立即触发重平衡,而非等待超时
consumer.close();
注意: 同一消费者组内,一个线程只能运行一个消费者;多消费者需多线程,可用 ExecutorService 管理。
Consumer 重要参数:
| 参数 | 说明 | 默认值 |
|---|---|---|
fetch.min.bytes | 每次 fetch 的最小数据量,未满则等待,降低低频场景的空轮询开销 | 1 byte |
fetch.max.wait.ms | 等待 fetch.min.bytes 的最长时间,与 fetch.min.bytes 共同决定延迟 | 500ms |
max.partition.fetch.bytes | 每个分区每次 poll 返回的最大字节数,必须大于 max.message.size | 1MB |
session.timeout.ms | 心跳超时时间,超时触发重平衡 | 3s |
heartbeat.interval.ms | 心跳发送频率,必须小于 session.timeout.ms,通常为其 1/3 | — |
auto.offset.reset | 偏移量无效时的行为:latest(从最新)或 earliest(从头) | latest |
enable.auto.commit | 是否自动提交偏移量,生产环境建议设为 false 手动控制 | true |
auto.commit.interval.ms | 自动提交间隔 | 5s |
partition.assignment.strategy | 分区分配策略:Range 或 RoundRobin | — |
max.poll.records | 单次 poll() 返回的最大记录数 | — |
8. 偏移量提交
偏移量提交到 Kafka 内部特殊主题 _consumer_offset,用于重平衡后恢复消费位置。
偏移量不一致的两种后果:
- 提交偏移量 < 最后处理偏移量 → 两者之间的消息被重复消费。
- 提交偏移量 > 最后处理偏移量 → 两者之间的消息丢失。
三种提交方式:
- 自动提交:
enable.auto.commit=true,每隔auto.commit.interval.ms(默认 5s)自动提交 poll 到的最大偏移量。简单但可能重复消费。 - **手动同步提交 **
commitSync():处理完所有消息后调用,提交失败会抛出异常并一直重试。
consumer.commitSync(); // 提交当前 poll 返回的最新偏移量
- **手动异步提交 **
commitAsync():不阻塞,不重试(即使失败)。
consumer.commitAsync((offsets, exception) -> { /* 处理失败 */ });
最佳实践:正常消费用 commitAsync(),在消费者关闭前(finally 块)用 commitSync() 确保最后一次提交成功:
try {
while (true) { ...; consumer.commitAsync(); }
} finally {
consumer.commitSync(); // 保证最后一次提交
consumer.close();
}
commitSync() 和 commitAsync() 均支持传入 Map<TopicPartition, OffsetAndMetadata> 提交特定分区的特定偏移量,实现更细粒度控制。
优缺点与局限性
优点与适用场景:
- 大规模日志采集、用户行为追踪、监控指标聚合:高吞吐 + 持久化天然契合。
- 流计算管道(配合 Flink/Spark Streaming):Streams API 提供原生流处理能力。
- 解耦多个系统:发布订阅模式下多个消费者组可独立消费全量数据,互不干扰。
- 削峰填谷:缓冲突发流量,消费者按自身节奏消费。
局限与踩坑点:
- 重平衡性能问题:Rebalance 会导致消费者组完全停止消费(STW),且过程很慢,是目前 Kafka 未彻底解决的缺陷。频繁触发(如 JVM GC 停顿超过
session.timeout.ms)会严重影响消费延迟。 - 消费者数不能超过分区数:超出的消费者永久闲置,是资源浪费。扩容时需提前规划分区数。
- 分区数只能增不能减:创建 Topic 时
num.partitions的选择需慎重,无法减少。 auto.create.topics.enable=true** 的线上陷阱**:任何客户端的读/写/元数据操作都会自动建 Topic,生产环境必须关闭。acks** 配置影响可靠性**:acks=0完全不保证送达;acks=1在 Leader 宕机时仍可能丢消息;只有acks=all才能保证强一致,但延迟最高。- 自动提交偏移量的重复消费风险:5s 的自动提交间隔内若消费者崩溃,重启后会从上次提交的偏移量重新消费,产生重复数据。
max.partition.fetch.bytes** 需大于 **max.message.size:否则超大消息将永远无法被消费,陷入死循环。log.retention.bytes** 按分区计算**:8 个分区 × 1GB = 整个 Topic 实际可保留 8GB,不是 1GB。
行动清单
- 搭建本地单节点 Kafka:配置
broker.id、log.dirs、zookeeper.connect,练习 Topic 创建、num.partitions和default.replication.factor设置。 - 动手实现三种 Producer 发送方式:分别用 Fire-and-Forget、
send().get()同步、Callback 异步各发送 100 条消息,观察延迟差异和异常处理行为。 - 实验分区策略:分别实现轮询、随机、Key 路由三种策略,创建 4 个分区的 Topic,观察消息分布是否符合预期。
- 验证 Consumer Group 行为:创建 4 个分区的 Topic,依次启动 1、2、4、5 个消费者,观察分区分配和第 5 个消费者的闲置状态;再 Kill 一个消费者,观察重平衡触发。
- 实践偏移量管理:先用自动提交,模拟消费者崩溃后重复消费的场景;再改为手动异步+关闭前同步的组合方式,对比两者差异。
- 调优
acks参数:分别将acks设为0、1、all,配合 Kafka 自带的kafka-producer-perf-test.sh工具测量吞吐量与延迟变化,感受可靠性与性能的权衡。 - 深入阅读:《Kafka 权威指南》(O’Reilly)+ 极客时间《Kafka 核心技术与实战》,重点补充 Exactly-Once 语义、ISR 机制、Controller 选举等进阶内容。
Kafka 基础入门
一句话摘要
Kafka 是面向大数据实时处理的分布式消息队列,核心价值是解耦、削峰、异步,架构围绕 Topic-Partition-Replica 三层模型展开。
核心知识点
一、为什么用消息队列
| 价值 | 解决的问题 |
|---|---|
| 解耦 | 生产者与消费者独立修改,互不影响 |
| 冗余 | 消息持久化,处理失败不丢数据 |
| 削峰 | 突发流量缓冲,避免系统崩溃 |
| 异步 | 消息入队即返回,后续异步消费 |
二、核心概念速查
Producer: 消息生产者,向 Broker 发消息的客户端。
Consumer: 消息消费者,从 Broker 读消息的客户端。
Consumer Group: 消费者组,组内每个消费者消费不同 Partition,一个 Partition 只能被组内一个消费者消费,不同组之间互不影响。
Broker: 一台 Kafka 机器 = 一个 Broker,多个 Broker 组成集群,一个 Broker 可容纳多个 Topic。
Topic: 逻辑概念,消息分类容器,生产者和消费者面向同一个 Topic。
Partition: 物理概念,Topic 的分片单元,每个 Partition 是一个有序队列,对应一个 log 文件。一个 Topic 可分布在多个 Broker 上的多个 Partition 中。
Replica: 副本,每个 Partition 有一个 Leader 副本 + 若干 Follower 副本,Follower 不能与 Leader 在同一个 Broker 上。
Leader: 生产者写入和消费者读取的唯一对象。
Follower: 实时同步 Leader 数据,Leader 故障时选举为新 Leader。
Offset: 消费者消费位置的游标,崩溃恢复后从此位置继续消费。
ZooKeeper: 存储和管理集群元数据,新版本正在逐步去除依赖。
三、工作流程
Producer
│
│ 发送消息(追加到 log 文件末尾)
▼
Leader Partition(物理 log 文件)
│ 每条消息记录自身 Offset
│
├──→ Follower 主动同步数据
│
▼
Consumer Group
└─ 每个 Consumer 实时记录当前消费的 Offset
- 消息以 Key + Value + 时间戳 三元组存储
- Producer 只往 Leader Partition 写,Consumer 只从 Leader Partition 读
- 数据顺序追加到 log 文件末尾,写入性能极高
四、存储机制(分片 + 索引)
问题: log 文件无限增大 → 数据定位效率低。
解决: 每个 Partition 切分为多个 Segment,每个 Segment 对应 4 个文件:
| 文件 | 作用 |
|---|---|
.index | 索引文件,存储大量索引信息 |
.log | 数据文件,存储实际消息 |
.snapshot | 快照文件 |
.timeindex | 时间索引文件 |
文件命名规则: 以当前 Segment 第一条消息的 Offset 命名。
目录命名规则: topic名称-分区号
示例:heartbeat topic,3个分区对应:
heartbeat-0/
heartbeat-1/
heartbeat-2/
.index 中的元数据 → 指向 .log 中对应消息的物理偏移量,实现快速定位。
五、副本机制
- 每个 Partition 建议设置 2 个副本(设置 3 个也常见)
- Producer 只写 Leader,Follower 主动拉取同步
- Consumer 只读 Leader
- Leader 故障 → Follower 选举新 Leader
Partition 0: Leader(Broker1) Follower(Broker2) Follower(Broker3)
Partition 1: Leader(Broker2) Follower(Broker1) Follower(Broker3)
Partition 2: Leader(Broker3) Follower(Broker1) Follower(Broker2)
六、Controller
- 集群中一台特殊的 Broker,承担额外的集群管理工作
- 选举方式:公平竞选,最先在 ZooKeeper 创建临时节点
/controller的 Broker 成为 Controller - 通常是第一台启动的 Broker,将自身信息写入
/controller节点
七、Offset 维护
| 版本 | Offset 存储位置 | 原因 |
|---|---|---|
| 0.9 之前 | ZooKeeper | 旧方案 |
| 0.9 之后 | Kafka 内置 Topic __consumer_offsets | 支持高并发读写 |
Consumer 崩溃恢复后,从 __consumer_offsets 读取上次 Offset,继续消费。
优缺点与局限性
| 特性 | 优点 | 限制 / 踩坑点 |
|---|---|---|
| Partition 分区 | 提高并发,支持横向扩展 | 同一 Partition 内有序,跨 Partition 不保证全局顺序 |
| 顺序写 log | 写性能极高,O(1) 追加 | log 文件需配合 Segment 分片管理,否则定位慢 |
| 副本机制 | 节点故障数据不丢 | Follower 同步有延迟,Leader 宕机瞬间可能丢少量数据 |
| Consumer Group | 多消费者并行,提升消费能力 | 一个 Partition 只能被组内一个 Consumer 消费,Consumer 数 > Partition 数时有 Consumer 空闲 |
| ZooKeeper 依赖 | 元数据管理成熟 | ZooKeeper 是额外运维负担,新版本正在去除 |
| Offset 自管理 | 灵活,支持重放消费 | 需要正确提交 Offset,否则重复消费或丢消息 |
行动清单
- 本地搭建单节点 Kafka:用 Docker 启动 Kafka + ZooKeeper,创建 Topic,用命令行生产和消费消息验证基本流程
- 观察 Partition 文件结构:创建 Topic 后进入数据目录,查看
.index、.log、.timeindex文件,理解 Segment 分片命名规则 - 模拟 Consumer Group:启动多个 Consumer 加入同一 Group,观察 Partition 分配和负载均衡行为
- 验证 Offset 机制:Consumer 消费一半后 kill 掉,重启后观察是否从上次 Offset 继续消费
- **查看 **
__consumer_offsets:用kafka-console-consumer订阅该内置 Topic,观察 Offset 提交记录 - 下一篇预习:带着问题阅读 Kafka 三高(高性能、高可用、高并发)的设计,重点关注零拷贝、ISR 机制、分区再均衡
Kafka 三高架构设计
一句话摘要
Kafka 的高可用、高性能、高并发分别依赖 ISR + ACK 机制、零拷贝 + 顺序写 + 内存池、三层 Reactor 网络架构实现,三者协同支撑起工业级消息队列的稳定性。
核心知识点
一、高可用设计
Controller 选举
三类选举:控制器选举、Leader 选举、消费者选举。
Controller 启动流程:
步骤1:第一个启动的 Broker 在 ZooKeeper 创建临时节点 /controller,写入自身信息
步骤2:其他 Broker 尝试创建 /controller,因节点已存在抛出异常,放弃竞争
步骤3:其他 Broker 在 Controller 上注册监听器,监听各自节点状态变化
Controller 的职责:普通 Broker 功能 + 管理 Topic 分区和副本状态 + 执行 Partition 重分配。
副本机制
- 每个 Partition 有多个副本,通过
replication-factor参数指定副本数 - 所有消息只写 Leader 副本,Follower 主动复制 Leader 数据
- Leader 不可用时,从 Follower 中选举新 Leader
- Broker 通过
broker.id唯一标识自身
ISR 机制
ISR(In-Sync Replica): 维护所有同步可用副本的列表,Leader 副本必然在 ISR 中。
Follower 留在 ISR 的条件:
- 定时向 ZooKeeper 发送心跳
- 在规定时间内从 Leader 低延迟地获取过消息
关键参数:
replica.lag.time.max.ms = 10000(默认10秒)
含义:Follower 落后 Leader 不超过10秒,视为同步副本
ISR 存储位置:
ZooKeeper 节点:/brokers/topics/[topic]/partitions/[partition]/state
维护方:Controller(选举新 Leader 时更新)+ Leader(定期检测 Follower 状态)
ACK 机制
acks 参数在 KafkaProducer 端配置,三个可选值:
| acks 值 | 确认时机 | 数据安全性 | 性能 | 适用场景 |
|---|---|---|---|---|
| 0 | 不等待 Broker 响应,写入 Socket Buffer 即完成 | 极低,可能丢失 | 最高 | 日志采集,允许丢数据 |
| 1 | Leader 写入本地日志后确认,不等 Follower | 中,Leader 宕机可能丢 | 中(默认配置) | 平衡场景 |
| all | 等待所有 ISR 副本确认后才返回 | 最高,不丢数据 | 最低 | 金融、订单等高可靠场景 |
acks=all 配套参数:
min.insync.replicas = 2(默认1)
含义:ISR 中副本数 < 该值时,Leader 停止写入,向 Producer 抛出 NotEnoughReplicas 异常
效果:能容忍 min.insync.replicas - 1 个副本同时宕机
二、高性能设计
Reactor 多路复用模型
Kafka SocketServer 基于 Java NIO 实现 Reactor 模式,三种角色:
Acceptor(1个)
│ 监听客户端连接请求(OP_ACCEPT)
▼
Processor(N个)
│ 负责读写数据,每个 Connection 对应一个 Processor
│ 每个 Processor 拥有独立的 Selector
▼
Handler(M个)
│ 处理业务逻辑
各角色之间用队列缓冲请求
Java NIO 三核心组件:Channel(数据传输)、Buffer(数据缓冲)、Selector(单线程处理多 Channel)。
生产消息流程
消息 → 封装为 ProducerRecord
→ 序列化(Serializer)
→ Partitioner 分区器确定目标分区(从 Broker 获取元数据)
→ 写入 RecordAccumulator 缓存队列(批次 RecordBatch)
→ Sender 线程将多个 Batch 封装为 Request 发送
→ 通过 Selector 对应的 KafkaChannel 发往 Broker
关键参数:
batch.size = 16KB(写满立即发送)
linger.ms = 500ms(未写满超时也发送)
顺序写磁盘 + OS Cache
- 写入时先写 os cache(Page Cache),由操作系统决定刷盘时机,大幅提升写效率
- 数据顺序追加到文件末尾,不做随机写
- 机械磁盘顺序写性能 ≈ 内存写入性能(避免磁盘寻址开销)
零拷贝(Zero-Copy)
传统消费数据路径(4次拷贝,多次上下文切换):
磁盘 → os cache → 应用程序缓存 → Socket 缓存 → 网卡 → 消费者
↑ ↑
这两次拷贝是多余的,且伴随内核态/用户态切换
零拷贝路径:
磁盘 → os cache → 网卡 → 消费者
(Socket 缓存只拷贝描述符,不拷贝数据)
两种零拷贝实现:
| 方式 | 原理 | Kafka 对应实现 |
|---|---|---|
mmap | 内存映射文件,减少用户态拷贝 | MappedByteBuffer |
sendfile | 数据直接从内核发到网卡 | FileChannel.transferTo() |
Java NIO 零拷贝示例:
// transferTo() 内部调用操作系统 sendfile() 系统调用
fileChannel.transferTo(position, count, socketChannel);
压缩传输
- 默认不开启压缩,开启后提升吞吐量、降低延迟、节省磁盘
- 压缩发生在 Producer 端,Broker 端保持压缩状态存储,Consumer 端解压
- 支持算法:
lz4、snappy、gzip、ZStandard(Kafka 2.1.0+ 新增,Facebook 开源,压缩比最高) - Producer、Broker、Consumer 必须使用相同压缩算法
服务端内存池设计
目的: 避免频繁内存申请与回收导致 Full GC,稳定生产者性能。
Buffer Pool 总大小:32MB
├── 内存队列:存放固定大小的内存块(每块 16KB)
└── 可用内存:动态分配区
数据结构 Batches:
Key → 消息 Topic 的分区
Value → 对应分区的批次队列
流程:
消息到达 → 申请内存块(16KB)→ 封装 Batch → Sender 发送
→ 发送完毕 → 清空内存块 → 归还内存池 → 循环复用
三、高并发设计
三层网络架构
第一层:Acceptor 线程(1个)
监听 OP_ACCEPT 事件,接收连接
封装为 SocketChannel,轮询分发给 Processor
第二层:Processor 线程(默认3个,num.network.threads=3)
连接队列存放 SocketChannel
注册 OP_READ,解析二进制请求为 Request 对象
将 Request 放入 RequestQueue
监听 ResponseQueue,注册 OP_WRITE 返回响应
第三层:RequestHandler 线程池(默认8个,num.io.threads=8)
从 RequestQueue 取出请求
解析数据写入磁盘
封装 Response 放入对应 Processor 的 ResponseQueue
容量估算与扩容:
默认:每个 RequestHandler 处理 2000 QPS
默认总量:2000 × 8 = 1.6万 QPS
扩容配置:
num.network.threads = 9
num.io.threads = 32(建议与 CPU 核数一致)
扩容后:2000 × 32 = 6.4万 QPS
优缺点与局限性汇总
| 技术 | 适用场景 | 限制 | 踩坑点 |
|---|---|---|---|
| acks=0 | 日志采集、允许丢数据 | 无法保证送达 | Retries 参数失效,Offset 返回 -1 |
| acks=1 | 通用场景(默认) | Leader 宕机可能丢消息 | 类似 MySQL 主从异步,存在数据差异 |
| acks=all | 金融、订单 | 性能最低 | 需配合 min.insync.replicas≥2,否则单副本等于降级 |
| ISR 机制 | 副本同步管理 | Follower 可能短暂落后 | replica.lag.time.max.ms 设置过短导致频繁移出 ISR |
| 零拷贝 | 消费侧大吞吐 | 依赖操作系统支持 | 数据需要修改时零拷贝失效,需结合 copy-on-write |
| 内存池 | 高频生产消息 | 总大小固定 32MB | 消息积压时内存池耗尽,生产者阻塞 |
| 顺序写 | 写入性能优化 | 日志文件持续增大 | 需配合 Segment 分片 + 定期清理策略 |
行动清单
- ACK 参数实验:分别用
acks=0/1/all发送消息,kill Leader 节点,对比三种配置下的数据丢失情况 - ISR 动态观察:用
kafka-topics.sh --describe实时查看 ISR 列表,手动暂停 Follower 观察其被移出 ISR 的过程 - 零拷贝性能对比:用
kafka-producer-perf-test.sh和kafka-consumer-perf-test.sh压测,对比开启/关闭零拷贝时的吞吐量差异 - 压缩算法对比:分别配置
snappy、lz4、zstd,压测吞吐量、CPU 占用和磁盘占用,选出最适合业务的算法 - 网络线程调优:在压测环境中调整
num.network.threads和num.io.threads,观察 QPS 变化,验证扩容公式 - 阅读源码:重点阅读
SocketServer.scala(Acceptor/Processor 实现)和RecordAccumulator.java(内存池实现)
Kafka Producer 底层原理
一句话摘要
Kafka Producer 通过分区器 + 序列化 + 内存池 + Sender 线程 + NIO 网络层的流水线设计实现高吞吐低延迟,核心调优围绕批次大小、内存、压缩、重试四个维度展开。
核心知识点
一、Producer 初始化流程
13 个步骤按序执行:
producer = new KafkaProducer<>(props);
// 1. 分区器(支持自定义)
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 2. 重试时间间隔,默认 100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 3. 序列化器
// ...
// 4. 拦截器
this.interceptors = new ProducerInterceptors<>(interceptorList);
// 5. 元数据,metadata.max.age.ms 默认 5 分钟
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), ...);
// 6. 单条消息最大大小,默认 1M,生产建议改为 10M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 7. 缓冲区大小,默认 32M
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 8. 压缩格式
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 9. RecordAccumulator 缓冲区,32M
this.accumulator = new RecordAccumulator(
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), // 默认 16KB
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG), // 默认 0ms
...);
// 10. 初始化元数据(此时未真正拉取)
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
// 11. NetworkClient 关键参数:
// connections.max.idle.ms = 9分钟(超时关闭空闲连接)
// max.in.flight.requests.per.connection = 5(保证有序需设为1)
// send.buffer.bytes = 128KB
// receive.buffer.bytes = 32KB
NetworkClient client = new NetworkClient(...);
// 12. Sender 线程(acks: 0/1/-1,retries 重试次数)
this.sender = new Sender(client, this.metadata, this.accumulator, ...);
// 13. 启动 Sender 守护线程
this.ioThread.start();
二、元数据拉取流程
主线程调用 send()
→ 触发元数据拉取标识
→ 主线程 wait() 等待
→ 唤醒 Sender(KafkaThread)线程
→ NetworkClient 从 Broker 集群拉取元数据
→ Broker 返回元数据响应
→ 更新 version 版本号到 Metadata 组件
→ 唤醒主线程继续执行
三、Producer 发送流程
9 个阶段:
1. 初始化(加载配置,启动网络线程)
2. 拦截器预处理消息,封装 ProducerRecord
3. Serializer 序列化 key/value
4. Partitioner 分配目标分区号
5. 从 Broker 获取集群元数据 metadata
6. 消息写入 RecordAccumulator
└─ 按目标分区找到对应 Deque<RecordBatch>
└─ 没有则新建 RecordBatch 加入队列
7. 达到发送阈值 → 唤醒 Sender 线程
└─ RecordBatch → Request
└─ 按【Broker Id → List<Request>】归类
8. 与各 Broker 建立网络连接,发送对应消息列表
9. 触发发送的条件(满足其一即发):
└─ 缓冲区数据达到 batch.size(默认16KB)
└─ 等待时间达到 linger.ms(默认0ms)
四、内存池设计
目的: 复用内存块,避免频繁 GC,防止 Full GC 影响生产者性能。
结构:
Buffer Pool 总大小:32MB
├── 已分配内存(紫色区域):固定 16KB 的内存块队列,可被反复取用
└── 可用内存(粉色区域):未分配的剩余内存,初始 32MB
申请内存逻辑:
// 加锁保证线程安全
this.lock.lock();
// 情况1:申请大小超过整个缓存池 → 抛异常
// 情况2:申请的恰好是 16KB 且已分配队列非空 → 直接取出复用
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// 情况3:可用内存足够 → 从可用内存划出一块
// this.availableMemory + freeListSize >= size
释放内存逻辑:
释放大小 == 16KB → 归还到已分配内存队列(供下次复用)
释放大小 != 16KB → 归还到可用内存区域(等待 JVM GC 回收)
为何只有 16KB 才能进已分配队列:
Batch 发送触发条件是写满(16KB)或超时,若允许不同大小的 Batch(如 1MB),未写满就超时发送会导致内存利用率极低。固定 16KB 保证每个 Batch 大小一致,充分利用内存。
五、网络 IO 架构(三层)
第一层:Acceptor 线程(1个)
监听 SelectionKey.OP_ACCEPT
接收 TCP 连接 → 封装 SocketChannel → 轮询分发给 Processor
第二层:Processor 线程(num.network.threads 个,默认3)
每个 Processor 维护三个队列:
├── newConnections:新连接信息
├── responseQueue:待返回给客户端的 Response(每个 Processor 独享)
└── inflightResponses:临时队列,存放已发出但回调未执行的 Response
职责:解析二进制请求 → 放入共享 RequestQueue
从 responseQueue 取 Response → 注册 OP_WRITE → 返回客户端
第三层:RequestHandler 线程池(num.io.threads 个,默认8)
从共享 RequestQueue 取请求 → 处理业务逻辑 → 写磁盘
→ 封装 Response → 放入对应 Processor 的 responseQueue
⚠️ RequestQueue 是共享的;ResponseQueue 是每个 Processor 独享的。
六、磁盘存储架构
LogManager(日志管理器)
└─ Log 对象(每个 Replica 对应一个)
│ 管理该分区所有 Segment 文件
└─ LogSegment(日志分段)
├── .log(数据文件,顺序追加)
├── .index(offset 索引,绝对偏移量 + 相对偏移量)
└── .timeindex(时间索引)
- Segment 大小到达阈值 →
maybeRoll()滚动新建 Segment - index 条目非每条消息创建,每隔
index.interval.bytes(默认 4096,即 4KB)创建一条 - LogSegment 底层通过
mmap+FileChannel与 OS Cache 交互,基于顺序写落盘
七、Producer 核心参数调优速查
| 参数 | 默认值 | 生产建议 | 说明 |
|---|---|---|---|
acks | 1 | 按场景选 0/1/all | 消息持久性保证 |
max.request.size | 1MB | 10MB | 超出则发送失败 |
retries | 0 | 3 | 解决网络抖动、Leader 重选举 |
retry.backoff.ms | 100ms | 总重试时间 > 异常恢复时间 | 两次重试间隔 |
connections.max.idle.ms | 540000(9分钟) | 默认够用 | 空闲连接超时关闭 |
compression.type | none | lz4(吞吐优先)/ zstd(压缩比优先) | 牺牲 CPU 换 IO |
buffer.memory | 32MB | 适当调大 | Producer 内存池总大小 |
batch.size | 16KB | 32KB | 调大提吞吐,延时略增 |
linger.ms | 0ms | 100ms | 0 表示立即发,调大减少请求次数 |
request.timeout.ms | 30000(30s) | 高负载调到 60s | 等待 Broker 响应超时 |
max.in.flight.requests.per.connection | 5 | 1(防乱序) | 保证分区内消息有序 |
优缺点与局限性
| 机制 | 优点 | 限制 / 踩坑点 |
|---|---|---|
| 内存池(16KB固定块) | 复用内存,大幅降低 GC 频率 | 消息大小远超 16KB 时,内存利用率下降 |
| 批次发送(Batch) | 减少网络请求,提升吞吐 | linger.ms=0 时批次效果等于无,吞吐量低 |
| 重试机制 | 应对网络抖动、Leader 切换 | max.in.flight=5 + 重试 → 可能消息乱序 |
| 压缩 | 显著节省网络和磁盘 IO | 增加 Producer 端 CPU 消耗 |
| acks=all | 最强数据安全保证 | 性能最低,需配合 min.insync.replicas≥2 |
| acks=0 | 最高吞吐 | Offset 返回 -1,消息丢失无感知,Retries 失效 |
行动清单
- 初始化参数实验:修改
max.request.size、batch.size、linger.ms,用kafka-producer-perf-test.sh对比吞吐量变化 - 复现消息乱序:将
max.in.flight.requests.per.connection设为 5 并开启 retries,模拟网络抖动,观察分区内消息顺序是否被打乱 - 内存池源码阅读:阅读
BufferPool.java中的allocate()和deallocate()方法,对照本文 16KB 固定块逻辑理解设计意图 - 压缩算法压测:分别配置
lz4、snappy、zstd,压测 CPU 占用 + 磁盘占用 + 吞吐量,选出最适合业务的算法 - 元数据拉取追踪:在 Producer 初始化后打断点,观察
metadata.update()的触发时机和 version 版本号变化 - 生产参数模板:基于调优表整理一份生产环境 Producer 配置模板,区分”高吞吐”和”高可靠”两种场景
Kafka Broker 内部机制
一句话摘要
Kafka Broker 通过 Controller 协调 + HW/LEO 副本同步 + LeaderEpoch 数据一致性 + 时间轮延迟任务四大机制,实现集群的高可用与高性能,其中 LeaderEpoch 是解决 HW 机制数据丢失的关键补丁。
核心知识点
一、Controller(控制器)机制
定义: 集群中唯一一台承担协调职责的 Broker,任意时刻有且只有一个。依赖 ZooKeeper 选举产生。
Controller 的五大职责
1. Topic 管理
负责 Topic 的创建、删除及分区增加,大部分后台工作由 Controller 完成。
2. 分区重分配(Reassignment)
新 Broker 加入集群时,不会自动承接已有 Topic 的负载,只对后续新增 Topic 生效。要让新 Broker 服务已有 Topic,必须手动触发分区重分配,将部分分区迁移到新 Broker 上。也用于解决多 Broker 之间存储负载不均衡问题。
3. Leader 选举
触发分区 Leader 选举的四种场景:
| 场景 | 触发条件 |
|---|---|
| Offline | 新建分区 或 分区失去现有 Leader |
| Reassign | 用户执行重分配操作 |
| PreferredReplica | 将 Leader 迁回首选副本 |
| ControlledShutdown | 现有 Leader 即将下线 |
选举后,Controller 向所有相关 Broker 发送更新请求(含最新 Leader/Follower 分配信息)。新 Leader 开始处理生产者和消费者请求,Follower 从新 Leader 复制消息。
4. 集群成员管理
依赖 ZooKeeper Watch 机制 + 临时节点组合实现:
- 新 Broker 启动 → 在
/brokers/ids下创建临时 znode - ZooKeeper 通过 Watch 推送通知给 Controller
- Controller 自动感知 Broker 上下线
5. 提供数据服务
Controller 保存最全的集群元数据,其他 Broker 定期接收 Controller 发来的元数据更新请求并刷新本地缓存。
Controller 存储的数据分类
| 分类 | 数据内容 |
|---|---|
| Broker 相关 | 存活 Broker 列表、正在关闭的 Broker 列表、每个 Broker 上的分区和副本 |
| Topic 相关 | Topic 列表、每个 Topic 的所有分区和副本、每个分区的 Leader 和 ISR |
| 运维任务 | 正在进行 Preferred Leader 选举的分区、正在重分配的分区列表 |
Controller 故障转移(Failover)
Broker 0(Controller)宕机
↓
ZooKeeper Watch 机制感知 → 删除 /controller 临时节点
↓
所有存活 Broker 竞选新 Controller
↓
Broker 3 抢先创建 /controller 节点,成为新 Controller
↓
Broker 3 从 ZooKeeper 读取集群元数据并初始化缓存
↓
Failover 完成,恢复正常工作
整个过程自动完成,无需人工干预。
二、HW 和 LEO 机制
三个关键偏移量
| 概念 | 全称 | 含义 |
|---|---|---|
| base offset | 起始位移 | 副本中第一条消息的 offset |
| HW | High Watermark(高水位) | 副本最新一条已提交消息的 offset |
| LEO | Log End Offset(日志末端位移) | 副本下一条待写入消息的 offset |
HW 的两大作用
- 标识可消费范围:HW 之前的消息对消费者可见(committed),HW 之后的消息不可见
- 协助副本数据同步:HW 推进依赖所有 Follower 的 LEO 更新
HW 更新规则
Leader 维护所有 Follower 的 LEO 值
↓
HW = min(所有 Follower 的 LEO)
(取最小值,保证任意 Follower 成为新 Leader 时数据完整)
Follower 自身 HW 的更新:
Follower HW = min(自身 LEO, Leader HW)
Broker 上的数据分布
- Broker 0(Leader 所在):保存 Leader 副本的 HW + LEO,以及所有 Follower 副本的 LEO(但不保存 Follower 的 HW)
- Broker 1(Follower 所在):只保存该 Follower 副本的 HW + LEO
Follower 同步数据时携带自身 LEO 上报给 Leader,Leader 据此更新各 Follower 的 LEO 记录并计算 HW。
三、Leader Epoch 机制
背景: HW 更新存在时间错配——Leader HW 更新后,Follower HW 更新需要额外一轮拉取才能完成。这个窗口期内连续宕机会导致数据丢失。Kafka 0.11 版本引入 Leader Epoch 解决此问题。
Leader Epoch 的组成
Leader Epoch = (Epoch, Start Offset)
Epoch:单调递增版本号,每次 Leader 变更时 +1
Start Offset:该 Epoch 对应的 Leader 写入的第一条消息的 offset
HW 机制导致数据丢失的场景
前提:min.insync.replicas = 1
初始状态:
Leader A:消息 [0,1,2,3],HW=4
Follower B:消息 [0,1,2,3],HW=3(尚未更新到4)
步骤1:Broker B 宕机重启
→ B 执行日志截断,LEO 截断至 HW=3
→ B 磁盘只剩消息 [0,1,2],位移3的消息被删除
步骤2:B 开始从 A 同步数据时,A 宕机
→ B 被选为新 Leader
步骤3:A 重启后作为 Follower
→ A 执行日志截断,HW 对齐 B 的 HW=3
→ 位移3的消息从 A 和 B 中永久消失 ✗
Leader Epoch 规避数据丢失的原理
Follower 重启后不再盲目截断到 HW,而是向 Leader 请求对应 Epoch 的 Start Offset,以此判断截断位置,避免误删已提交消息。每次 Leader 变更时 Epoch 递增,旧 Leader 恢复后自动识别为过期 Leader,不能行使 Leader 权力。
四、时间轮(TimingWheel)机制
背景: Kafka 中存在大量延迟操作(延迟生产、延迟拉取、延迟删除等)。JDK 的 Timer 和 DelayQueue 插入/删除时间复杂度为 O(n log n),无法满足高性能要求。时间轮将插入和删除复杂度降为 O(1)。
数据结构
时间轮(环形数组)
└─ 每个时间格 → TimerTaskList(环形双向链表)
└─ TimerTaskEntry(封装 TimerTask)
关键参数:
tickMs:每格时间跨度(基本时间单位)
wheelSize:总格数
interval:总时间跨度 = tickMs × wheelSize
currentTime:表盘指针(当前处理时间,为 tickMs 的整数倍)
单层时间轮示例
tickMs = 1ms,wheelSize = 20,interval = 20ms
任务1:定时 2ms → 放入时间格 2
任务2:定时 8ms → 放入时间格 10(currentTime=2时,8ms后到期,2+8=10)
任务3:定时 19ms → 复用已到期的时间格 1(环形复用)
currentTime 推进时,处理对应时间格的 TimerTaskList 中的所有任务
层级时间轮(解决超大延迟)
当任务到期时间超过当前时间轮总跨度时,加入上层时间轮:
第一层:tickMs=1ms,wheelSize=20,interval=20ms
第二层:tickMs=20ms,wheelSize=20,interval=400ms
第三层:tickMs=400ms,wheelSize=20,interval=8000ms
例:350ms 的任务 → 超过第一层(20ms)和第二层(400ms)?
→ 350ms < 400ms,放入第二层
→ 随着时间推进,任务从上层降级到下层,最终精确执行
优缺点与局限性
| 机制 | 适用场景 | 限制 | 踩坑点 |
|---|---|---|---|
| Controller 选举 | 集群启动与故障恢复 | 同一时刻只有一个 Controller,存在单点风险 | 依赖 ZooKeeper,ZK 异常会影响 Controller 选举 |
| HW 机制 | 副本同步 + 消费可见性 | HW 更新存在时间错配 | min.insync.replicas=1 时连续宕机可能丢数据 |
| Leader Epoch | 防止数据丢失/不一致 | 0.11 版本之前不支持 | 版本升级时需注意兼容性 |
| 分区重分配 | Broker 扩容后负载均衡 | 新 Broker 不自动接管旧 Topic | 必须手动触发,重分配期间有性能损耗 |
| 单层时间轮 | 小范围延迟任务 | 超过 interval 的任务无法处理 | 需配合层级时间轮使用 |
| 层级时间轮 | 任意延迟范围的任务调度 | 实现复杂度高 | 任务从上层降级到下层有轻微调度开销 |
行动清单
- 模拟 Controller 切换:Docker 启动 3 节点 Kafka 集群,kill Controller 所在节点,观察 ZooKeeper
/controller节点变化和新 Controller 选举日志 - 观察 HW/LEO 变化:用
kafka-log-dirs.sh实时查看分区的 HW 和 LEO 值,暂停 Follower 后观察 HW 停止推进的现象 - 复现数据丢失场景:设置
min.insync.replicas=1,按 HW 数据丢失步骤操作,验证位移 3 的消息是否消失,再开启 LeaderEpoch 对比 - 验证分区重分配:新增 Broker 节点后,用
kafka-reassign-partitions.sh手动触发重分配,观察分区迁移过程 - 时间轮源码阅读:阅读 Kafka
TimingWheel.scala和TimerTaskList.scala,对照本文结构理解环形数组 + 双向链表的实现 - 层级时间轮实践:手写一个简化版两层时间轮(Java/Go),支持插入任意延迟任务并按时触发,验证 O(1) 复杂度
Kafka Consumer 底层原理
一句话摘要
Kafka Consumer 采用 Pull 模式消费,通过 Consumer Group + 分区分配策略 + Rebalance + 位移提交 四大机制保障高并发消费和故障恢复,生产环境推荐 StickyAssignor + 混合提交模式。
核心知识点
一、消费方式:Pull vs Push
Kafka Consumer 采用 Pull 模式。
| 模式 | 缺点 | Kafka 的处理 |
|---|---|---|
| Push | Broker 不清楚 Consumer 消费速度,推送速率不可控,易造成消息堆积甚至系统崩溃 | 不采用 |
| Pull | Broker 无消息时 Consumer 空轮询,浪费资源 | poll() 携带 timeout 参数,无消息时 Long Polling 阻塞等待,直到数据到达再返回 |
二、Consumer 初始化流程
四步完成初始化:
// 1. 构造配置对象
Properties props = new Properties();
props.put("bootstrap.servers", "...");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// 2. 创建 KafkaConsumer 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅 Topic 列表
consumer.subscribe(Arrays.asList("topic-name"));
// 4. 循环调用 poll() 拉取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息...
}
三、Consumer Group 机制
设计原因: 单个 Consumer 无法消费百万级数据,Consumer Group 提供可扩展且具容错性的并行消费能力。
核心特点:
- 每个 Group 有一个或多个 Consumer
- 每个 Group 有唯一的 Group ID
- 同一 Partition 只能被组内一个 Consumer 消费
- 不同 Consumer Group 之间互不影响,可同时消费同一 Topic
四、Partition 分配策略
1. RangeAssignor(默认策略)
按 Topic 维度分配:对每个 Topic,先对 Partition 排序,再对 Consumer 排序,按范围区段分配。
缺陷: 分区数不能被 Consumer 数整除时,排前面的 Consumer 分配更多分区,随订阅 Topic 增加,不均衡越来越严重。
2. RoundRobinAssignor
将所有 Topic 的 Partition 和所有 Consumer 排序后,轮询逐一分配。
- 组内所有 Consumer 订阅相同 Topic → 分配均衡 ✅
- 组内 Consumer 订阅不同 Topic → 分配可能倾斜 ❌
3. StickyAssignor(推荐生产使用)
从 Kafka 0.11 版本引入,通过 partition.assignment.strategy 参数配置。
两个目标(优先级从高到低):
- Partition 分配尽量均衡
- Rebalance 发生时,尽量与上次分配结果保持一致
Rebalance 后对比(C1 下线场景):
初始状态:
C0: [T0P0, T1P0]
C1: [T0P1, T1P1]
C2: [T0P2, T1P2]
C1 下线后:
RoundRobinAssignor(完全重新分配):
C0: [T0P0, T0P2, T1P1]
C2: [T0P1, T1P0, T1P2]
StickyAssignor(最小调整):
C0: [T0P0, T1P0, T0P1] ← 仅接管 C1 的分区
C2: [T0P2, T1P2, T1P1] ← 仅接管 C1 的分区
结论: StickyAssignor 在 Rebalance 后迁移代价更小,生产环境推荐使用。
五、Rebalance 机制
触发条件(三种)
- Consumer Group 成员数量变化(加入、主动离组、故障下线)
- 订阅 Topic 数量变化
- 订阅 Topic 的分区数变化
通知机制
Consumer 端心跳线程定期向 Coordinator 发送心跳。Coordinator 决定开启 Rebalance 后,将 REBALANCE_IN_PROGRESS 封装进心跳响应,Consumer 收到后感知 Rebalance 开始。
五大协议
| 协议 | 作用 |
|---|---|
Heartbeat | Consumer 定期证明存活 |
LeaveGroup | 主动告知 Coordinator 离组 |
JoinGroup | 成员请求加入组 |
SyncGroup | Leader Consumer 将分配方案告知所有成员 |
DescribeGroup | 展示组的全部信息(管理员使用) |
Rebalance 主要用到前 4 种。
Consumer Group 五种状态
| 状态 | 含义 |
|---|---|
| Empty | 组内无成员,可能有未过期的已提交位移,只响应 JoinGroup |
| Dead | 组内无成员,元数据已被 Coordinator 移除,返回 UNKNOWN_MEMBER_ID |
| PreparingRebalance | 准备开始新 Rebalance,等待所有成员重新加入 |
| CompletingRebalance | 所有成员已加入,等待分配方案(旧版本称 AwaitingSync) |
| Stable | Rebalance 完成,Consumer 可正常消费 |
Rebalance 两阶段流程
阶段一:JoinGroup
所有 Consumer → 发送 JoinGroup 请求(携带订阅 Topic 信息)→ Coordinator
Coordinator → 选出 Leader Consumer(通常是第一个发请求的)
Coordinator → 将全量订阅信息发给 Leader
阶段二:SyncGroup
Leader Consumer → 制定分配方案 → 封装进 SyncGroup 请求 → Coordinator
其他 Consumer → 发送空内容的 SyncGroup 请求 → Coordinator
Coordinator → 将分配方案下发给所有组成员
各 Consumer → 知道自己消费哪些 Partition
四种 Rebalance 场景
| 场景 | 触发方 | 关键动作 |
|---|---|---|
| 新成员 C1 加入 | C1 发送 JoinGroup | 触发全组 Rebalance |
| C2 主动离组 | C2 发送 LeaveGroup | 触发全组 Rebalance |
| C2 超时被踢出 | 心跳超时 | Coordinator 踢出,触发 Rebalance |
| C2 提交位移 | C2 提交 Offset | 不触发 Rebalance,仅更新位移 |
六、位移提交机制
概念区分:
- Topic Partition 位移:消息在 Broker 端的存储偏移量
- 消费者位移(Consumer Offset):Consumer Group 在某分区上的消费进度,记录下一条待消费消息的位移
位移提交按分区粒度进行,Consumer 为每个分配到的 Partition 单独提交 Offset。
自动提交
// 开启自动提交(默认 true)
props.put("enable.auto.commit", "true");
// 提交间隔,默认 5000ms
props.put("auto.commit.interval.ms", "5000");
- 每次
poll()调用时,自动提交上一批消息的位移 - 缺陷: Rebalance 发生在自动提交间隔之间时,Offset 未提交 → Rebalance 完成后重复消费
手动提交
props.put("enable.auto.commit", "false");
同步提交:
// 阻塞等待直到提交成功,提交失败会抛异常
consumer.commitSync();
// 注意:必须在处理完 poll() 返回的所有消息后才调用,过早提交导致消息丢失
异步提交:
// 立即返回,不阻塞,不影响 TPS
// 失败不重试(重试时位移值可能已不是最新)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// 记录日志或异常处理
}
});
混合提交(生产推荐):
try {
while (running) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
process(records);
consumer.commitAsync(); // 正常情况异步提交,不影响 TPS
}
} catch (Exception e) {
// 异常时同步提交,利用其自动重试能力
} finally {
try {
consumer.commitSync(); // 关闭前同步提交,确保最终一致
} finally {
consumer.close();
}
}
七、__consumer_offsets 存储
版本演变:
| 版本 | Offset 存储位置 | 原因 |
|---|---|---|
| 0.8 之前 | ZooKeeper | 旧方案 |
| 0.9 之后 | __consumer_offsets(Kafka 内置 Topic) | ZooKeeper 不适合高频写,Kafka Topic 天然支持 |
消息格式(KV 结构):
Key:<Group ID, Topic 名称, 分区号>
Value:Offset 值 + 元数据(提交时间、过期时间等)
示例输出:
[order-group-1, topic-order, 0]::[OffsetMetadata[36672,NO_METADATA],
CommitTime 1633694193000, ExpirationTime 1633866993000]
创建规则:
触发时机:第一个 Consumer 启动时自动创建
分区数:offsets.topic.num.partitions = 50(默认)
副本数:offsets.topic.replication.factor = 3(默认)
分区路由:abs(GroupId.hashCode()) % NumPartitions
常用查询命令:
# 查看所有消费者组
./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --list
# 查看某消费者组消费情况
./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 \
--group test-group-1 --describe
# 计算 group 对应的 __consumer_offsets 分区
abs(GroupId.hashCode()) % 50
# 查看指定分区的位移数据(Kafka 0.11+)
./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--partition xx
八、消费进度监控(Consumer Lag)
Lag 定义: Producer 已生产消息数 - Consumer 已消费消息数
示例:
Producer 生产:1000 万条
Consumer 消费:900 万条
Lag = 100 万条
Lag 的意义: Lag 值增大趋势 → 消费积压,可能拖慢下游处理速度。
四种监控方式:
kafka-consumer-groups.sh命令行工具(--describe)- Kafka Java Consumer API 编程获取
- Kafka 自带 JMX 监控指标
- 云产品自带监控功能
优缺点与局限性汇总
| 机制 | 优点 | 限制 / 踩坑点 |
|---|---|---|
| Pull 模式 | Consumer 自控消费速度,支持延迟处理 | 无消息时需 Long Polling,否则空轮询浪费 CPU |
| RangeAssignor | 实现简单,默认配置即用 | 订阅 Topic 增加时不均衡问题加剧 |
| RoundRobinAssignor | 相同 Topic 订阅时均衡 | 不同 Topic 订阅时可能严重倾斜 |
| StickyAssignor | Rebalance 后迁移最小,均衡性好 | 实现最复杂,0.11 版本才引入 |
| 自动提交 | 零代码,使用简单 | Rebalance 时可能重复消费 |
| 同步提交 | 可靠,失败自动重试 | 阻塞线程,影响 Consumer TPS |
| 异步提交 | 不阻塞,高 TPS | 失败不重试,可能丢失位移更新 |
| 混合提交 | 兼顾 TPS 和可靠性 | 代码逻辑相对复杂 |
__consumer_offsets | 高频写性能好,天然持久化 | 默认 50 分区,过多 Group 时元数据管理复杂 |
行动清单
- Long Polling 验证:启动 Consumer 连接一个空 Topic,设置
poll(Duration.ofMillis(3000)),观察是否阻塞 3 秒后才返回 - 三种分配策略对比:创建含 3 个分区的 Topic,分别用三种策略启动 Consumer Group,关闭一个 Consumer,对比 Rebalance 后分配结果
- 复现 Rebalance 重复消费:开启自动提交(
auto.commit.interval.ms=5000),在提交间隔内强制触发 Rebalance,观察消息重复消费现象 - 混合提交代码实现:实现生产级混合提交模板(commitAsync 正常流程 + commitSync 关闭保底),在 IDE 中验证异常场景
- **查看 **
__consumer_offsets:对运行中的 Consumer Group 执行kafka-consumer-groups.sh --describe,计算 GroupId 对应的分区号,再用kafka-console-consumer.sh直接读取原始位移数据 - Lag 监控搭建:用
kafka-consumer-groups.sh脚本轮询 Lag 值,或接入 Prometheus + Grafana 实现可视化监控,设置 Lag 阈值告警
Kafka 基础概念及架构
一句话摘要
Kafka 是一个基于 Zookeeper 协调的分布式发布-订阅消息系统,核心解决高吞吐量场景下的消息持久化、分区有序传输与多消费者解耦问题。
二、核心知识点
2.1 Kafka 是什么
Kafka 是分布式、分区、多副本、多生产者、多订阅者的日志/消息系统,依赖 Zookeeper 进行集群协调。
设计目标:
- 消息持久化时间复杂度为 O(1),对 TB 级数据保持常数访问性能
- 单机支持每秒 100K 条消息传输
- 支持分区内消息有序,分区间不保证全局顺序
- 同时支持离线和实时数据处理
- 支持在线水平扩展
消息传递模式: 仅支持发布-订阅模式,不支持点对点模式。消息只有拉取(Pull),无推送(Push),可通过轮询模拟推送。
2.2 四大核心 API
| API | 作用 |
|---|---|
| Producer API | 向一个或多个 Topic 发布消息流 |
| Consumer API | 订阅一个或多个 Topic,处理消息流 |
| Streams API | 充当流处理器,消费输入 Topic,生产输出 Topic(实现流转换) |
| Connector API | 构建可复用的生产者/消费者,将 Topic 连接到外部系统(如数据库变更捕获) |
2.3 基本架构组件
消息(Message)与批次(Batch)
- 消息是 Kafka 的最小数据单元,由字节数组组成,类比数据库的一行记录。
- 消息包含键(Key,字节数组),用于决定写入哪个分区(对 Key 做散列取模)。
- 消息可以批量写入,同一批次属于同一 Topic 和分区。批次越大,单位时间吞吐量越高,但单条延迟越长;批次数据会被压缩以提升传输和存储效率。
- Kafka 使用 Apache Avro 作为序列化格式(强类型,优于 JSON/XML)。
主题(Topic)与分区(Partition)
- Topic 是消息的逻辑分类,类比数据库的表或文件夹。
- 一个 Topic 可划分为多个 Partition,实现横向扩展。
- 消息以追加写入方式存储在分区中,以先进先出顺序读取。
- Kafka 只保证单个 Partition 内消息顺序,无法保证整个 Topic 的全局顺序。
- 分区提供数据冗余能力。
生产者(Producer)
生产者决定消息投递到哪个分区,有三种策略:
- 轮询(Round-Robin):默认策略,消息均匀分布到所有分区
- Key 散列:对消息 Key 计算散列值,映射到固定分区(相同 Key 必然进同一分区)
- 自定义分区器:按业务规则灵活路由
消费者(Consumer)与消费组(Consumer Group)
- 消费者通过偏移量(Offset) 标记已读消息位置。Offset 是单调递增的整数,在给定分区内每条消息唯一。
- Offset 存储在 Kafka 自身(早期存在 Zookeeper,现已迁移到 Kafka)。消费者重启不丢失读取状态。
- 消费组保证每个分区只能被组内一个消费者使用,避免重复消费。
- 某消费者失效时,触发再平衡(Rebalance),重新分配分区给组内其他消费者。
Broker 与集群
- 单个 Broker 是一台独立的 Kafka 服务器,可轻松处理数千个分区及每秒百万级消息。
- 集群中自动选举出一个 Broker 作为控制器(Controller)(通过 Zookeeper Master 选举),负责:分区分配给 Broker、监控 Broker 健康。
- 每个分区归属于一个 Broker,该 Broker 称为该分区的首领(Leader)。
- 分区可复制到多个 Broker(副本),副本分区不处理读写请求,仅负责数据同步。
Topic 分区与 Broker 数量的关系:
| 场景 | 分配规则 |
|---|---|
| Partition 数 == Broker 数 (N) | 每个 Broker 存储 1 个 Partition |
| Partition 数 (N) < Broker 数 (N+M) | N 个 Broker 各存 1 个,M 个 Broker 不存该 Topic |
| Partition 数 (N) > Broker 数 | 部分 Broker 存多个 Partition(生产环境应避免,易造成负载不均) |
2.4 副本机制(Replicas)
副本类型
- 首领副本(Leader Replica):每个分区只有一个,所有生产者写入和消费者读取都必须经过它。
- 跟随者副本(Follower Replica):不处理客户端请求,只从 Leader 拉取消息保持同步。Leader 崩溃时,某个 Follower 晋升为新 Leader。
副本集合分类
AR(Assigned Replicas)= ISR + OSR
- ISR(In-Sync Replicas):与 Leader 保持一定同步程度的副本集合(含 Leader 本身)。只有 ISR 中的副本可以在 Leader 切换时被选举为新 Leader。
- OSR(Out-of-Sync Replicas):与 Leader 同步滞后过多的副本集合(不含 Leader)。正常情况下 OSR 为空,即 AR = ISR。
关键水位线概念
- HW(High Watermark,高水位):消费者只能消费到 HW 之前的消息,表示已完成多副本同步的最高 Offset。
- LEO(Log End Offset):当前日志文件中下一条待写入消息的 Offset,表示分区日志的末尾位置。
2.5 偏移量(Offset)
- 生产者 Offset:消息写入分区时,该分区当前最大的 Offset 即为生产者 Offset。
- 消费者 Offset:消费者在各分区上最后消费到的位置。例如:生产者已写入到 Offset=12,Consumer A 消费到 Offset=9,Consumer B 可从不同位置独立消费,互不干扰。
2.6 性能优化手段
Kafka 高吞吐量的底层实现依赖:
- 零拷贝(Zero-Copy):减少内核态与用户态之间的数据复制
- 顺序读写(Sequential I/O):磁盘顺序访问速度接近内存随机访问
- Linux 页缓存(Page Cache):利用操作系统缓存加速读写
三、优缺点与局限性
优势
- 高吞吐:单机每秒处理几十到百万级消息,存储 TB 级数据性能稳定
- 高可用:副本机制 + Partition Leader 自动切换,支持节点故障无感知
- 持久化:消息持久化到磁盘,配合 replication 防止数据丢失
- 易扩展:增加 Broker 无需停机,Producer/Consumer/Broker 均支持水平扩展
局限性与踩坑点
| 问题 | 说明 |
|---|---|
| 无全局消息顺序 | 只保证单 Partition 内有序,跨 Partition 无序。需要全局顺序时,必须将相关消息路由到同一 Partition(用同一个 Key) |
| 仅支持发布-订阅 | 不支持点对点(Queue)模式,若需求是一条消息只消费一次且有多个不同消费者竞争,需通过消费组设计实现 |
| Partition 数 > Broker 数时有风险 | 单 Broker 承载多 Partition,负载不均,生产环境应避免 |
| 消费者 Offset 管理 | 早期 Offset 存 Zookeeper,高并发下 Zookeeper 成为瓶颈;现已改为存 Kafka 内部 Topic(__consumer_offsets) |
| OSR 副本存在时可靠性下降 | 若 ISR 集合缩小(大量副本进入 OSR),Leader 切换时选择范围受限,极端情况下可能数据丢失 |
| HW 机制导致消费延迟 | 消费者只能读到 HW 以前的数据,HW 推进依赖 Follower 同步进度,Follower 滞后会间接影响消费实时性 |
四、行动清单
- 搭建本地环境:使用 Docker Compose 启动单节点 Kafka + Zookeeper,练习创建 Topic、生产消息、消费消息的基本命令(
kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh)。 - 验证 Partition 顺序性:创建一个有 3 个 Partition 的 Topic,不指定 Key 发送消息,观察消息被分发到不同 Partition,验证跨 Partition 无序的特性;再用固定 Key 发送,验证同 Key 消息的有序性。
- 理解 Consumer Group 机制:启动 2 个消费者属于同一 Consumer Group、3 个属于不同 Group,观察消息分发差异,深入理解”同组内每个 Partition 只被一个消费者消费”的规则。
- 深入学习副本机制:重点研究 ISR/OSR 的动态变化条件(
replica.lag.time.max.ms参数控制滞后阈值),以及min.insync.replicas参数如何影响数据可靠性与可用性的权衡。 - 学习 Offset 管理:研究
auto.commit.enable、enable.auto.commit配置,理解自动提交 Offset 与手动提交的区别,以及消费失败时如何避免消息丢失或重复消费。 - 扩展阅读:进一步研究 Kafka 的
Streams API和Connector API,以及 Kafka 与 Spring Boot 的整合实践(参考原文作者的配套文章《Kafka原理解析及与Spring Boot整合步骤》)。
Kafka 存储架构深度笔记
一句话摘要
Kafka 存储架构的本质是顺序追加写日志 + 稀疏索引,通过「Topic + Partition + Replica + LogSegment + 索引」五层结构解决海量数据的高性能写入与高效检索问题。
核心知识点
一、存储方案选型
存储介质 IO 速度对比
层级(从快到慢):
CPU 寄存器 > CPU Cache > 内存 > SSD > 机械磁盘 > 网络
关键结论(反直觉):
机械磁盘顺序 I/O:53.2M values/s
内存随机 I/O: 36.7M values/s
→ 磁盘顺序写性能 > 内存随机读写性能
两种存储方向的权衡
| 方向 | 实现方式 | 代表系统 | 缺陷 |
|---|---|---|---|
| 提高读速度 | 维护索引 | MySQL(B+ Tree) | 大量写操作需维护索引,降低写效率 |
| 提高写速度 | 顺序追加日志 | Kafka、大数据系统 | 无索引,查询需遍历 |
Kafka 为何不用 B+ Tree
- 每次写都要维护索引,成本高
- 存在”数据页分裂”操作,对高并发系统太重
- 写并发要求百万级 TPS,B+ Tree 无法满足
Kafka 为何不用哈希索引
- 哈希索引需常驻内存
- 每秒百万级消息写入,内存极易 OOM
Kafka 最终方案:稀疏索引
将 Offset 设计为有序字段,消息在 log 文件中有序存放,将消息划分成若干块,只索引每块第一条消息的 Offset:
查找流程(类似二分查找):
1. 根据目标 Offset 大小,在稀疏索引中定位到对应的块
2. 在块内顺序查找目标消息
优点:
- 无需全量内存存储索引
- 查询效率远优于全文遍历
- 写入只需顺序追加,不维护复杂索引
二、存储架构设计
五层结构:Topic → Partition → Replica → LogSegment → 索引文件
Topic(逻辑概念,消息分类)
└─ Partition(物理存储单元,水平扩展)
└─ Replica(副本,保证高可用)
└─ LogSegment(日志分段,防止单文件过大)
├─ .log(数据文件)
├─ .index(偏移量索引)
├─ .timeindex(时间戳索引)
└─ .snapshot(快照索引,可能存在)
日志目录布局示例:
Topic: topic-order,4个分区,对应磁盘目录:
topic-order-0/
topic-order-1/
topic-order-2/
topic-order-3/
每个目录内的文件(以 baseOffset 命名,固定20位数字,不足补0):
00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000012768089.log ← baseOffset=12768089,说明前面有12768089条消息
00000000012768089.index
00000000012768089.timeindex
activeSegment: 只有最后一个 LogSegment 可执行写入操作,称为 activeSegment。达到阈值后滚动创建新 activeSegment。
三、日志格式演变
V0 版本(Kafka 0.10.0 之前)
消息结构:
[Offset(8B)] [MessageSize(4B)] [CRC32(4B)] [magic(1B)] [attributes(1B)]
[key length(4B)] [key] [value length(4B)] [value]
字段说明:
magic = 0
attributes 低3位:0=NONE, 1=GZIP, 2=SNAPPY, 3=LZ4
key length = -1 表示无 key
value length = -1 表示消息为空
最小消息大小:14 字节
计算示例(key="hello", value="world"):
4(CRC) + 1(magic) + 1(attributes) + 4(key length) + 5(key) + 4(value length) + 5(value)
= 24 字节
V1 版本(Kafka 0.10.0 ~ 0.11.0)
相比 V0 新增:timestamp 字段(8B)
作用:
对内:影响日志保存、切分策略
对外:影响消息审计、端到端延迟计算等
最小消息大小:22 字节
同样示例消息大小:24 + 8 = 32 字节
V0/V1 的设计缺陷
- 空间使用率低:key/value 长度字段固定 4 字节,无论实际大小
- 消息总长度未保存:需实时计算,效率低
- 只保存最新消息位移
- 冗余 CRC 校验:批次发送时每条消息仍单独保存 CRC
V2 版本(Kafka 0.11.0.0+)
核心改进(引入 RecordBatch 批次结构):
1. CRC 从消息中移除,统一放到 RecordBatch 中
2. 增加 producer id、producer epoch、序列号(支持幂等性和事务消息)
3. 使用增量形式保存时间戳和位移(节省空间)
4. 使用可变长度类型(解决空间使用率低问题)
5. 批次最小 61 字节(比单条消息大,但批量发送时整体节省磁盘空间)
四、日志清理机制
通过 Broker 端参数配置清理策略:
log.cleanup.policy = delete # 默认,日志删除
log.cleanup.policy = compact # 日志压缩(需同时设置 log.cleaner.enable=true)
log.cleanup.policy = delete,compact # 同时支持两种
检查周期:log.retention.check.interval.ms = 300000(默认5分钟)
日志删除(Log Retention)三种策略
策略一:基于时间
配置参数优先级:
log.retention.ms > log.retention.minutes > log.retention.hours
默认:log.retention.hours = 168(7天)
判断依据:
非文件修改时间,而是日志段中最大时间戳 largestTimeStamp
时间戳 > 0 则用时间戳,否则用文件 lastModifiedTime
删除步骤:
1. 从日志段跳跃表中移除待删除段(确保无线程读取)
2. 对应所有文件(含索引)添加 ".deleted" 后缀
3. 延迟任务("delete-file",默认1分钟执行)删除 ".deleted" 文件
可通过 file.delete.delay.ms 配置
策略二:基于日志大小
配置参数:
log.retention.bytes = -1(默认,无穷大) ← 所有日志文件总大小,非单个
log.segment.bytes = 1GB(单个 LogSegment 大小)
删除步骤:
1. 计算日志总大小 - retentionSize = 需删除的总大小
2. 从第一个日志段开始查找 deletableSegments
3. 执行删除
策略三:基于日志起始偏移量
判断依据:下一个日志段的 baseOffset <= logStartOffset 则可删除
删除示例:
logStartOffset = 40
日志段1:[0, 20) → 下一段 baseOffset=20 < 40 → 加入 deletableSegments
日志段2:[20, 35) → 下一段 baseOffset=35 < 40 → 加入 deletableSegments
日志段3:[35, 50) → 下一段 baseOffset=50 > 40 → 停止,从此段开始不删除
日志压缩(Log Compaction)
规则:对相同 key 的不同 value,只保留最新版本
配置:log.cleanup.policy = compact + log.cleaner.enable = true
类比:类似 Redis RDB 持久化模式
场景:系统崩溃后快速恢复,只需恢复最新状态数据
五、磁盘数据存储机制
三大核心技术组合:
1. 顺序追加写日志
→ 写性能接近内存,避免磁盘寻址
2. PageCache(OS 页缓存)
→ 读:先查 PageCache,命中直接返回,未命中才读磁盘
→ 写:写入 PageCache(脏页),OS 在合适时机刷盘
→ Kafka 大量使用 PageCache,是高吞吐的重要因素之一
3. 零拷贝(Zero-Copy)
→ 数据从 PageCache 直接发到网卡,跳过应用层拷贝
→ 详见三高架构笔记
消息写入磁盘完整路径:
Producer 发送消息
→ 经过网络层(Acceptor → Processor → RequestHandler)
→ 写入 PageCache(os cache)
→ OS 异步刷盘到 LogSegment 的 .log 文件
→ 同时更新 .index 和 .timeindex 索引文件
优缺点与局限性汇总
| 技术 | 适用场景 | 限制 | 踩坑点 |
|---|---|---|---|
| 稀疏索引 | 顺序 Offset 查询 | 非精确索引,块内仍需顺序扫描 | index.interval.bytes 设置过大导致块内扫描过多 |
| V2 日志格式 | 批量发送场景 | 单条消息批次头开销 61 字节,比 V0/V1 大 | 小消息场景单条发送反而更浪费空间 |
| 基于时间清理 | 日常日志保留控制 | 依赖 largestTimeStamp,时钟漂移可能误判 | 不要依赖文件修改时间判断,需关注时间戳索引 |
| 基于大小清理 | 磁盘容量管控 | log.retention.bytes 是全分区总大小,非单 Segment | 误以为是单 Segment 大小而设置过小导致频繁删除 |
| Log Compaction | 状态存储、快速恢复 | 只保留最新值,历史数据不可追溯 | 与 delete 策略混用时需明确哪些 Topic 用哪种策略 |
| PageCache | 读多写多场景 | 依赖 OS 管理,宕机时未刷盘数据丢失 | 生产环境需配合 acks=all + min.insync.replicas 保障数据安全 |
行动清单
- 观察实际日志目录:在 Kafka 安装目录下找到
log.dirs配置的路径,查看topic-partition目录下的.log、.index、.timeindex文件,验证 baseOffset 命名规则 - 稀疏索引验证:用
kafka-dump-log.sh --files xxx.index查看索引内容,观察索引条目的间隔(index.interval.bytes默认 4096 字节) - 日志格式对比:用
kafka-dump-log.sh --files xxx.log --print-data-log查看 V2 格式 RecordBatch 结构,对照三个版本字段差异 - 三种清理策略实验:分别创建配置
delete/compact/delete,compact的 Topic,写入数据后触发清理,观察文件变化 - PageCache 监控:在 Kafka Broker 机器上执行
cat /proc/meminfo | grep -i cache,对比生产者高负载时 PageCache 占用变化 - 稀疏索引源码:阅读
OffsetIndex.scala中的lookup()方法,理解二分查找定位块的具体实现
Kafka 高可靠高性能原理笔记
一句话摘要
Kafka 通过 ACK 策略 + 副本机制 + Leader Epoch 保障消息不丢失,通过 PageCache 异步刷盘 + 零拷贝 + 批量压缩 + 稀疏索引 + 多 Reactor 模型实现高吞吐低延迟。
核心知识点
1. 整体架构
四大组件:
- Producer:消息创建者,通过分区器路由到指定 Broker
- Broker:服务实例,负责消息持久化与中转
- Consumer:主动 Pull 消息,同一消费者组内每条消息只被一个消费者消费
- ZooKeeper:管理 Broker/Consumer 集群元数据;Producer 不在 ZK 存数据,只监听 Broker 和 Topic 信息
核心数据概念:
| 概念 | 说明 |
|---|---|
| Topic | 消息分类单位,收发时只需指定 Topic |
| Partition | Topic 下的分区,分布在不同 Broker,提供并行处理和横向扩容能力 |
| Segment | Partition 下的分段文件,含 .log(数据)、.index(位移索引)、.timeindex(时间戳索引),以该段最小 Offset 命名 |
| Offset | 消息在分区内的唯一标识,单调递增,Kafka 只保证分区内有序,不保证 Topic 全局有序 |
2. 高可靠性:Producer → Broker 消息不丢失
2.1 ACK 策略
Request.required.acks = 0 # 发即认为成功,不等确认,适合日志等低可靠场景
Request.required.acks = 1 # Leader 写入成功即返回,有丢数据风险
Request.required.acks = -1 # ISR 中所有副本写完才返回,强可靠
强可靠组合配置(三件套):
Request.required.acks = -1
min.insync.replicas > 2
unclean.leader.election.enable = false # 只允许 ISR 中的副本成为新 Leader
2.2 同步发送 vs 异步发送
- 异步发送:消息写入 Input Channel 立即返回,Dispatcher 协程负责真正发送;发送失败只能从错误日志得知,无法自建兜底函数处理
- 同步发送:本质也是异步的,通过
WaitGroup阻塞等待 Broker 确认,可明确感知成功/失败;失败时可重试,保障 at-least-once 语义
2.3 幂等性与事务(exactly-once)
| 类型 | 开启方式 | 核心能力 | 局限 |
|---|---|---|---|
| 幂等性 Producer | enable.idempotence=true | Broker 用 PID + Seq number 去重 | 仅限单分区、单会话 |
| 事务型 Producer | enable.idempotence=true + Transcational.id + Consumer 侧 Isolation.level=read_committed | 跨分区原子写入,跨会话事务恢复 | 对性能影响大;相同 Transaction ID 的旧 Producer 实例会失效 |
事务协调组件:TransactionCoordinator,事务状态持久化在内部 Topic __transaction_state。
3. 高可靠性:Broker 数据持久化
3.1 异步刷盘机制
Broker 收到消息 → 写入 PageCache → 立即返回成功 → Linux Flusher 异步刷盘(触发条件:主动调用 sync/fsync、可用内存低于阈值、Dirty Data 时间达到阈值)。
风险:单机未刷盘时 Broker 宕机,数据丢失。解法:副本机制。
3.2 副本机制(Replica)
- AR = ISR + OSR
- ISR:与 Leader 同步延迟不超过
replica.lag.time.max.ms(默认 10s)的副本集合 - Follower 每隔 500ms 向 Leader Fetch 一次数据
- Leader 宕机时,从 ISR 中选举新 Leader(由 ZooKeeper 触发)
3.3 HW 和 LEO 机制
- LEO(Log End Offset):当前 Log 文件下一条待写入消息的 Offset
- HW(High Watermark):ISR 所有副本都已写入的最大 Offset,消费者只能消费 HW 之前的数据
计算规则:
Leader HW = min(所有副本 LEO)
Follower HW = min(Follower 自身 LEO, Leader HW)
一次完整更新流程(1 个 Follower 场景):
- 初始:
HW_L=0, LEO_L=0, HW_F=0, LEO_F=0 - Leader 收到消息 →
LEO_L=1;Follower Fetch → Leader 记录LEO_F=0,HW_L=min=0,返回HW_L=0, LEO_L=1;Follower 存消息 →LEO_F=1, HW_F=min(1,0)=0 - Follower 再次 Fetch → Leader 记录
LEO_F=1,HW_L=min=1;Follower 更新HW_F=min(1,1)=1
隐患:Follower 和 Leader 的 HW 更新存在时间差,这是数据丢失和数据错乱问题的根源。
3.4 KIP-101 问题及 Leader Epoch 解法
数据丢失场景:Follower A 重启后以 HW 为基准截断日志,若随后 Leader B 宕机,A 成为新 Leader,被截断的已提交消息就丢失了。
数据错乱场景:A 和 B 同时宕机,B 的消息因未刷盘丢失,B 却被选为新 Leader 并写入新消息 m3;A 恢复后不知道 Offset=1 的消息已不一致,直接开始同步,导致日志错乱。
Leader Epoch 解法(类似 Raft 任期号):
- 每次选举新 Leader,Epoch(严格单调递增 ID)加 1
- Follower 重启后不再以 HW 为准截断,而是先发
LeaderEpochRequest询问当前 Epoch 的起始 Offset - 数据丢失解决:A 重启后得知
LEO=2,保留 m2,不再截断 - 数据错乱解决:A 重启后得知 B 已是第 1 代,起始 Offset=1,A 截断自己 Offset=1 的消息,重新与 B 同步
4. 高可靠性:消费者侧
| 提交方式 | 参数 | 特点 | 风险 |
|---|---|---|---|
| 自动提交 | enable.auto.commit=true + auto.commit.interval.ms | 后台自动提交,无需手动处理 | 消费中途崩溃可能导致消息丢失 |
| 手动提交 | enable.auto.commit=false | 业务逻辑处理完再提交 | 未提交时重启会重复消费,需消费侧做幂等 |
最佳实践:手动提交 + 幂等,实现 at-least-once + 去重 = 业务层 exactly-once。
5. 高性能:异步发送 + 批量发送
- 异步发送本质:消息入 Channel 即返回,最大化吞吐
- 批量发送两个关键参数:
Batch.size = 16KB(默认) # 增大可提升吞吐,但增加延迟
linger.ms = 0(默认) # 设为 10~100ms 可积攒更多消息一起发
触发发送:Batch.size 达到上限 或 linger.ms 到期,两者满足其一即发送。
6. 高性能:压缩技术
Compression.type = none/gzip/snappy/lz4/zstd # 2.1.0 版本后支持 zstd
Compression.level = -1(默认) # 0-9,-1 表示使用默认级别
性能对比(均越高越好):
- 吞吐量:
LZ4 > Snappy > zstd ≈ GZIP - 压缩比:
zstd > LZ4 > GZIP > Snappy
注意:Broker 与 Producer 压缩算法不同时,Broker 会先解压再重新压缩,有额外 CPU 开销;消息版本不兼容也会触发解压重压。
7. 高性能:PageCache + 顺序追加写
- 消息写入 PageCache 即返回,避免同步刷盘的巨大开销
- 所有消息顺序追加到 Partition 日志文件尾部,充分利用顺序 I/O,规避随机 I/O
8. 高性能:零拷贝
网络数据持久化(Producer → Broker):mmap
传统方式:4 次数据拷贝 + 4 次上下文切换 + 2 次系统调用
mmap 优化:Socket Buffer 的数据在内核空间直接落盘,无需拷贝到应用进程缓冲区,减少上下文切换和拷贝次数。
数据传输(Broker → Consumer):Sendfile
传统方式:4 次数据拷贝 + 4 次上下文切换
Sendfile 优化(NIO 的 transferTo/transferFrom):2 次内核数据拷贝 + 2 次上下文切换 + 1 次系统调用,消除 CPU 数据拷贝。
9. 高性能:稀疏索引
设计思想:不为每条消息建索引,每写入 log.index.interval.bytes(默认 4KB)才新增一个索引项,用空间换时间。
位移索引(.index)索引项结构:
- 相对位移 = 消息 Offset - 文件起始 Offset(例:文件名
00000000000000000100.index,存 Offset 150 时,相对位移 = 50,用 4 字节存储,节省空间) - 文件物理位置:消息在 .log 文件中的字节偏移
查找流程示例(查找 Offset=3550 的消息):
- 二分查找索引文件,找到小于 3550 的最大索引项
[3528, 2310272] - 从 .log 文件物理位置 2310272 顺序扫描,直到找到 Offset=3550 的消息
时间复杂度:O(log N)(二分)+ O(M)(顺序扫描,M 为索引间距内的消息数)。
时间戳索引(.timeindex):Kafka 0.10.0.0 引入,结构与位移索引类似,支持按时间戳快速查找消息。
10. 高性能:多 Reactor 多线程网络模型
Acceptor → 轮询分发 → Processor 线程 → RequestQueue → KafkaRequestHandlerPool(Worker 线程池)
↓
Response 返回给 Processor → 客户端
- SocketServer:实现 Reactor 模式,处理多客户端并发请求
- KafkaRequestHandlerPool:Worker 线程池,处理实际 I/O 逻辑
- 充分利用多核 CPU,提升吞吐和响应速度
11. 负载均衡
生产者侧(DefaultPartitioner):
- key 不为 null → 对 Key 做 Hash → 路由到固定分区(顺序消息实现的关键)
- key 为 null → 轮询所有可用分区
消费者侧(通过 Partition.assignment.strategy 配置):
RangeAssignor:连续分区分配给同一消费者RoundRobinAssignor:轮询分配StickyAssignor(0.11.0.0+):在均衡前提下尽量保留原有分配结果,减少 Rebalance 开销
优缺点与局限性
| 技术点 | 适用场景 | 局限 / 踩坑点 |
|---|---|---|
acks=-1 | 金融、支付等强可靠场景 | 写入延迟最高,需配合 min.insync.replicas>2 才有意义 |
| 异步发送 | 高吞吐、对延迟不敏感场景 | 发送失败只能靠日志感知,无法自建回调兜底;不适合强一致业务 |
| 事务型 Producer | 跨分区原子写、consumer-transform-producer | 性能开销大,慎重使用;旧 Producer 实例在新实例启动后自动失效 |
| 幂等性 Producer | 单分区单会话去重 | 不跨分区、不跨会话(重启即失效),不能替代事务 |
| 批量发送 | 高吞吐场景 | Batch.size 过大会增加延迟,需结合 linger.ms 调优 |
| 压缩技术 | 网络带宽紧张场景 | 消耗 CPU;Broker 与 Producer 压缩算法不一致时有额外解压重压开销 |
| PageCache 异步刷盘 | 高吞吐写场景 | 单机宕机时 PageCache 中未刷盘数据丢失,必须依赖副本机制弥补 |
| 零拷贝 | 大流量数据传输 | 依赖 OS 支持(Linux sendfile / mmap),不同 OS 实现效果有差异 |
| 稀疏索引 | 海量消息检索 | log.index.interval.bytes 过大会增加顺序扫描范围,是重要调优参数 |
| HW 机制 | 副本一致性保障 | HW 更新存在时间差,需配合 Leader Epoch 才能彻底解决数据丢失/错乱 |
| 自动提交 Offset | 日志分析等低可靠场景 | 消费中途崩溃会丢消息,不适合强可靠业务 |
| StickyAssignor | 减少 Rebalance 开销 | 0.11.0.0+ 才支持,老版本无此选项 |
行动清单
- 搭建实验环境:本地起一个 3-Broker 的 Kafka 集群,分别测试
acks=0/1/-1下的吞吐和延迟差异,感受三种策略的权衡。 - 验证 Leader Epoch:模拟 Leader 宕机场景,观察 Follower 的日志截断行为,对比有无 Leader Epoch 时的表现(可通过旧版本 Kafka 对比)。
- 调优批量发送参数:在压测环境中调整
Batch.size(16KB / 64KB / 128KB)和linger.ms(0 / 10 / 100ms)的组合,找到延迟与吞吐的最优平衡点。 - 压缩算法选型:对业务消息分别测试
lz4(高吞吐优先)和zstd(高压缩比优先)在实际数据上的压缩效果,确认是否存在 Broker 端的解压重压开销。 - 实现手动提交 + 幂等消费:在项目中将
enable.auto.commit=false,在消息处理成功后手动提交,并在业务层(数据库唯一键或 Redis SETNX)实现幂等,验证重启后的重复消费场景。 - 阅读 KIP-101:直接阅读官方 Proposal(文章参考文献第 6 条),理解 Leader Epoch 的设计决策过程。
- 深入零拷贝:分别查阅 Linux
mmap和sendfile系统调用文档,理解内核态/用户态切换的具体过程,再结合 Java NIO 的transferTo实现对照理解。 - 稀疏索引调优:调整
log.index.interval.bytes参数,观察索引文件大小变化以及查询延迟变化,确定适合业务消息体积的最优间距。
Kafka 三高架构设计
一句话摘要
Kafka 三高架构的核心是:高可用靠 ISR + ACK,高性能靠顺序写 + 零拷贝 + 内存池,高并发靠三层 Reactor 网络模型,三者协同构建工业级消息队列。
📌 本篇为系列精简复习版,各模块已有独立深度笔记,此处重点梳理核心结论与参数速查。
核心知识点
一、高可用:Leader 选举 + 副本 + ISR + ACK
Controller 选举(三步)
1. 第一个启动的 Broker → ZooKeeper 创建临时节点 /controller → 成为 Controller
2. 其他 Broker 尝试创建 /controller → 失败 → 放弃竞争
3. 其他 Broker 在 Controller 上注册监听器,监听节点状态变化
ISR 核心参数
replica.lag.time.max.ms = 10000(默认10秒)
含义:Follower 落后 Leader 超过10秒 → 移出 ISR
ISR 存储路径:
/brokers/topics/[topic]/partitions/[partition]/state
ISR 维护者:
- Controller:Leader 选举时更新
- Leader:定期线程检测 Follower 是否脱离 ISR
ACK 三种模式速查
| acks | 确认时机 | 数据安全 | 性能 | 适用场景 |
|---|---|---|---|---|
| 0 | 写入 Socket Buffer 即返回 | 最低,可能丢失 | 最高 | 日志采集 |
| 1 | Leader 写入本地日志后返回(默认) | 中 | 中 | 通用场景 |
| all | 所有 ISR 副本确认后返回 | 最高 | 最低 | 金融、订单 |
acks=all 配套参数:
min.insync.replicas = 2(默认1)
能容忍宕机副本数 = min.insync.replicas - 1
ISR 副本数 < min.insync.replicas → 抛出 NotEnoughReplicas,停止写入
二、高性能:Reactor + 顺序写 + 零拷贝 + 压缩 + 内存池
Reactor 多路复用
Java NIO 三核心:Channel(传输)、Buffer(缓冲)、Selector(单线程多路复用)
Kafka Reactor 三角色:
Acceptor(1个)→ 接收连接
Processor(N个)→ 读写数据,每个独立 Selector
Handler(M个)→ 处理业务逻辑
各角色之间用队列缓冲,充分解耦
生产消息流程(7步)
消息 → ProducerRecord 封装
→ Serializer 序列化
→ Partitioner 确定分区(拉取集群元数据)
→ 写入 RecordAccumulator 缓存队列(批次 RecordBatch)
→ Sender 线程封装多批次为 Request
→ Selector 对应 KafkaChannel 发往 Broker
→ 内存池回收内存,避免 Full GC
关键参数:
batch.size = 16KB(写满立即发)
linger.ms = 500ms(未满超时发)
顺序写 + OS Cache
写入路径:消息 → OS PageCache(内存)→ 操作系统异步刷盘
关键结论:
机械磁盘顺序写 ≈ 内存写入速度
顺序追加到文件末尾,不做随机写,避免磁盘寻址开销
零拷贝
传统路径(4次拷贝):
磁盘 → OS Cache → 应用缓存 → Socket 缓存 → 网卡
零拷贝路径(2次拷贝):
磁盘 → OS Cache → 网卡(Socket 缓存只拷贝描述符)
Java 实现:
fileChannel.transferTo() → 底层调用 Linux sendfile() 系统调用
MappedByteBuffer → mmap 内存映射
压缩传输
流程:Producer 端压缩 → Broker 端保持 → Consumer 端解压缩
中间传输全程不解压,节省网络和磁盘开销
支持算法:lz4 / snappy / gzip / ZStandard(Kafka 2.1.0+,压缩比最高)
Producer、Broker、Consumer 必须使用相同压缩算法
内存池设计
Buffer Pool 总大小:32MB
├── 已分配内存队列:固定 16KB 内存块,反复复用
└── 可用内存:未分配区域
数据结构 Batches:
Key → 分区
Value → 该分区的批次队列
申请 → 写数据 → Sender 发送 → 清空归还内存池 → 复用
避免频繁 GC,保证生产者稳定性
三、高并发:三层网络架构
第一层:Acceptor(1个)
绑定 OP_ACCEPT,监听客户端连接
封装为 SocketChannel,轮询分发给 Processor
第二层:Processor(默认3个,num.network.threads=3)
连接队列存放 SocketChannel(轮询存放)
注册 OP_READ → 接收请求 → 解析二进制 → 封装 Request → 放入 RequestQueue
监听 ResponseQueue → 注册 OP_WRITE → 返回响应给客户端
中间层:RequestQueue(共享)+ ResponseQueue(每个 Processor 独享,共3个)
第三层:RequestHandler 线程池(默认8个,num.io.threads=8)
从 RequestQueue 取请求 → 解析 → 写磁盘
封装 Response → 放入对应 Processor 的 ResponseQueue
容量计算:
默认:每个 RequestHandler 处理 2000 QPS
默认总量:2000 × 8 = 1.6万 QPS
扩容配置:
num.network.threads = 9
num.io.threads = 32(建议与 CPU 核数一致)
扩容后:2000 × 32 = 6.4万 QPS
优缺点与局限性速查
| 机制 | 核心价值 | 限制 / 踩坑 |
|---|---|---|
| ISR | 动态维护同步副本,保障数据安全 | replica.lag.time.max.ms 太小导致频繁移出 ISR |
| acks=all | 最强数据保障 | 需配合 min.insync.replicas≥2,否则单副本等于没保障 |
| acks=0 | 最高吞吐 | Retries 失效,Offset 返回 -1,数据丢失无感知 |
| 零拷贝 | 大幅减少 CPU 和上下文切换 | 数据需修改时零拷贝失效 |
| 内存池(16KB固定) | 避免 Full GC | 消息远大于 16KB 时内存利用率下降 |
| 批次发送 | 降低请求数,提升吞吐 | linger.ms=0 时批次失效,等于单条发送 |
| 顺序写 + PageCache | 写性能接近内存 | 宕机时 PageCache 未刷盘数据丢失,需 acks 配合保障 |
| Reactor 三层架构 | 高并发请求处理 | 默认参数适合中等负载,高负载需手动调整两个核心参数 |
行动清单
- 串联三高机制:画一张从 Producer 发消息到 Consumer 消费全链路图,标注每个环节对应哪个三高机制
- ACK 场景模拟:分别用 acks=0/1/all,kill Leader 节点,对比三种配置下数据丢失行为
- 零拷贝性能测试:用
kafka-consumer-perf-test.sh压测,对比开启/关闭零拷贝的消费吞吐量差异 - 网络参数调优实验:将
num.network.threads=9、num.io.threads=32后重新压测,验证 QPS 提升是否符合 2000×线程数的估算 - 压缩算法选型:分别测试 lz4/zstd 的 CPU 占用、磁盘占用、吞吐量,制定业务选型决策标准
- 内存池源码:阅读
BufferPool.java的allocate()和deallocate(),验证 16KB 固定块复用逻辑
Kafka 消息丢失全解析
一句话摘要
Kafka 消息丢失发生在生产者、Broker、消费者三个环节,通过对应的配置加固 + 幂等/事务机制,可实现从 At-Most-Once 到 Exactly-Once 的语义升级。
二、核心知识点
2.1 Kafka 基础概念速查
| 组件 | 职责 |
|---|---|
| Broker | Kafka 服务器,存储消息 |
| Producer | 发送消息到 Kafka |
| Consumer | 从 Kafka 拉取并处理消息 |
| Topic | 消息的逻辑分类 |
| Partition | Topic 的物理分片,分布在不同 Broker |
| ISR | In-Sync Replicas,与 Leader 保持同步的副本集合 |
| HW | High Watermark,ISR 全部复制到的最高偏移量,消费者只能看到 HW 之前的消息 |
| LEO | Log End Offset,副本本地日志末端位置;Leader LEO > HW 表示有未同步完的新消息 |
2.2 消息丢失的三大环节与根因
生产者端
| 场景 | 原因 |
|---|---|
acks=0 (Fire-and-Forget) | 不等待 Broker 确认,网络抖动直接丢失 |
| 异步发送未处理回调 | 发送失败无感知,程序继续执行 |
| 重试次数不足 | 网络瞬断导致消息发送失败后不再重试 |
Broker 端
| 场景 | 原因 |
|---|---|
replication.factor=1 | 只有单副本,Broker 宕机数据彻底丢失 |
min.insync.replicas=1 | Leader 确认即返回,Follower 未同步时 Leader 宕机则丢失 |
unclean.leader.election.enable=true | 允许 LEO 落后的 Follower 选为新 Leader,造成数据截断 |
消费者端
| 场景 | 原因 |
|---|---|
enable.auto.commit=true | 偏移量已提交但消息处理失败,重启后跳过该消息 |
| 先提交偏移量再处理消息 | 处理过程异常,偏移量已记录为已消费 |
2.3 生产者端防丢失方案
关键配置:
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有 ISR 副本确认才算成功
props.put(ProducerConfig.RETRIES_CONFIG, 10); // 重试次数
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔 300ms
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等,避免重复
异步发送 + 回调处理(推荐):
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 发送失败,写本地存储等待重试
saveToLocalStorage(record);
}
});
同步发送(兜底方案):
RecordMetadata metadata = producer.send(record).get(); // 阻塞等待确认
2.4 Broker 端防丢失方案
关键配置(server.properties):
default.replication.factor=3 # 默认副本数
min.insync.replicas=2 # 至少 2 副本同步才返回成功
unclean.leader.election.enable=false # 禁止落后副本成为 Leader
log.flush.interval.messages=1000 # 每 1000 条消息刷盘一次
log.flush.interval.ms=1000 # 每 1000ms 刷盘一次
创建高可靠 Topic 命令:
bin/kafka-topics.sh --create --bootstrap-server broker1:9092 \
--topic important-topic \
--partitions 6 --replication-factor 3 \
--config min.insync.replicas=2 \
--config flush.messages=1 \
--config retention.ms=1209600000
2.5 消费者端防丢失方案
关键配置:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关闭自动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
正确模式:先处理,再提交(批量):
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processRecords(records); // 先处理
consumer.commitSync(); // 成功后再提交
精细模式:按分区提交偏移量:
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// ... 处理每条消息 ...
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
currentOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
consumer.commitSync(currentOffsets);
事务模式(数据库操作 + 偏移量提交原子化):
conn.setAutoCommit(false);
processRecordInTransaction(conn, record);
conn.commit(); // 先提交 DB 事务
consumer.commitSync(); // 再提交 Kafka 偏移量
2.6 三种消息传递语义
| 语义 | 含义 | 实现方式 | 适用场景 |
|---|---|---|---|
| At Most Once | 最多一次,可能丢失 | acks=0,不重试 | 日志收集、指标监控 |
| At Least Once | 至少一次,可能重复 | acks=all + 重试 + 手动提交 | 大多数业务场景 |
| Exactly Once | 恰好一次,不丢不重 | 幂等生产者 + 事务 API | 金融交易、计费系统 |
2.7 Exactly Once 实现(Kafka 0.11+)
生产者端:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 唯一 ID
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
producer.initTransactions();
producer.beginTransaction();
// ... send messages ...
producer.commitTransaction(); // 失败时 abortTransaction()
消费者端:
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读已提交事务
消费-处理-生产的原子事务(关键模式):
processorProducer.beginTransaction();
// 1. 处理消息并发送到 output-topic
// 2. 将消费者偏移量绑定到同一事务
processorProducer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
processorProducer.commitTransaction();
2.8 偏移量管理机制
- Kafka 0.9 之前:偏移量存储在 ZooKeeper
- Kafka 0.9 之后:存储在内部 Topic
__consumer_offsets - 提交方式:
commitSync()(阻塞)、commitAsync()(非阻塞,通过回调确认) - 消费者再均衡(Rebalance)时需通过
ConsumerRebalanceListener正确处理偏移量提交,否则可能重复消费或丢失
2.9 关键监控指标
| 层 | 指标 | 含义 |
|---|---|---|
| 生产者 | record-error-rate | 消息错误率 |
| 生产者 | record-retry-rate | 消息重试率 |
| Broker | UnderReplicatedPartitions | 副本同步不足的分区数 |
| Broker | OfflinePartitionsCount | 离线分区数 |
| 消费者 | records-lag | 消费延迟(积压消息数) |
| 消费者 | records-lag-max | 最大消费延迟 |
推荐监控组合:Kafka JMX + Prometheus + Grafana
三、优缺点与局限性
acks=all** + **min.insync.replicas=2
- 优点:强可靠性保证,所有 ISR 副本同步才返回成功
- 限制:写入延迟上升,吞吐量下降;如果 ISR 副本数低于
min.insync.replicas,生产者会直接报错
幂等性(enable.idempotence=true)
- 优点:自动去重,避免网络重试导致消息写入两次
- 限制:仅保证单个生产者会话、单个分区内的幂等,不能跨分区/跨会话去重
Exactly Once 事务
- 优点:端到端不丢不重
- 限制:需要 Kafka 0.11+;有额外性能开销(约 5-10%);
TRANSACTIONAL_ID必须全局唯一,否则新实例会 Fence 掉旧实例
手动提交偏移量
- 优点:精确控制,消息处理失败可以不提交
- 踩坑点:必须保证消息处理幂等,否则 Rebalance 后重复拉取会造成重复消费(而非丢失)
unclean.leader.election.enable=false(默认值)
- 优点:防止数据截断
- 限制:所有 ISR 副本都挂掉时,分区会变为不可用,需要人工干预,可用性换可靠性
消息重复 vs 消息丢失
- 消息重复可通过幂等性设计(唯一 ID、数据库唯一约束、业务状态机)解决
- 消息丢失一旦发生,数据可能不可恢复,无法确定丢失范围
- 设计原则:宁可接受重复,不接受丢失
四、行动清单
- 配置自查:检查线上 Kafka 集群的
acks、replication.factor、min.insync.replicas、unclean.leader.election.enable四个核心参数是否符合生产级标准(建议:acks=all,副本数=3,min.insync.replicas=2,unclean=false)。 - 消费者代码审计:排查代码中是否存在
enable.auto.commit=true或”先提交偏移量再处理消息”的反模式,全部改为手动提交且先处理后提交。 - 动手练习幂等生产者:用本文代码创建一个开启
enable.idempotence=true的生产者,验证网络重试场景下不会产生重复消息。 - 实现 Exactly Once Demo:按”消费-处理-生产”事务模式,写一个从
input-topic读取、处理后写入output-topic的事务性 Processor,用sendOffsetsToTransaction绑定偏移量提交。 - 搭建监控面板:在本地或测试环境配置 Kafka JMX → Prometheus → Grafana 链路,重点关注
UnderReplicatedPartitions和records-lag两个告警指标。 - 熟悉 LEO/HW 原理:手动模拟 Leader 宕机场景,观察新 Leader 选举后日志截断到 HW 的行为,加深对数据丢失边界的理解。
- 面试准备:能够清晰讲述三端(生产者/Broker/消费者)各自的丢失场景和对应解法,以及三种消息语义的实现原理与适用场景。
Kafka 丢数据场景与解决方案
一句话摘要
Kafka 只对已提交的消息做最大限度的持久化保证,数据丢失可能发生在 Producer、Broker、Consumer 三端,通过参数组合配置可最大限度规避。
核心知识点
一、消息传递语义
三种语义定义:
| 语义 | 含义 | 触发场景 |
|---|---|---|
| at most once | 消息最多被消费一次,可能丢失 | Consumer 先提交 Offset 再处理,处理中 crash |
| at least once | 消息至少被消费一次,可能重复 | Producer 重试 / Consumer 先处理再提交,提交前 crash |
| exactly once | 消息恰好被消费一次 | 需幂等 + 事务机制配合,难以完全实现 |
Kafka 默认提供 at least once 语义。
幂等与事务配置
// 启用幂等传递(0.11.0.0+):防止 Producer 重试导致消息重复写入日志
enable.idempotence = true
// 启用事务(保证多 Topic 分区消息要么全部写入成功,要么全部失败)
transactional.id = "指定值"
二、Kafka 不丢数据的前提条件
两个必要条件:
- 消息必须是已提交状态:N 个 Broker 成功收到消息并写入日志,告知 Producer 提交成功
- 至少 1 个存活的 Broker:N 个 Broker 中至少有 1 个存活,消息才不丢失
三、Producer 端丢失场景与解决方案
丢失场景
消息发送流程:
Producer → 获取 Leader Partition 元数据
→ 发送消息到 Leader Partition
→ Leader 写入 PageCache
→ Follower 拉取同步数据
→ 所有 ISR 副本 ACK → Leader 回复 Producer ACK
Producer 使用异步发送,丢失原因:
- 网络抖动:消息根本未到达 Broker
- 消息体过大:超出 Broker 承受范围被拒收
- acks 配置不当导致提前确认
acks 各值的丢失风险:
acks = 0:发后即忘,网络抖动直接丢,Retries 失效
acks = 1:Leader 写入即确认,Leader Crash 后 Follower 未同步的数据丢失
acks = all:ISR 中只剩 Leader 时,退化为 acks=1,仍可能丢失
解决方案
方案一:使用带回调的发送方式
// 弃用发后即焚
producer.send(msg);
// 改用带回调的方式,发送失败可针对性处理
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 网络抖动 → 重试
// 消息过大 → 调整后重发
}
});
方案二:ACK 确认机制配置
# 等待所有 ISR 副本确认
request.required.acks = -1 # 或 all
两种典型 acks=all 场景:
场景1(数据不丢):
Leader 收到消息 → 所有 ISR 同步完成 → Leader Crash
→ 选举新 Leader → 数据完整不丢失 ✅
场景2(数据重复):
Leader 收到消息 → 部分 ISR 同步 → Leader Crash
→ Producer 收到失败标识 → 重新发送 → 数据重复 ⚠️
(需配合幂等处理)
方案三:重试参数配置
# Kafka 2.4+ 默认 Integer.MAX_VALUE
retries = Integer.MAX_VALUE
# 保证消息有序性(每次只允许一个未响应请求)
max.in.flight.requests.per.connection = 1
# 重试时间间隔,避免无效频繁重试,推荐 300ms
retry.backoff.ms = 300
四、Broker 端丢失场景与解决方案
丢失场景
消息写入路径:
Broker 接收消息 → 写入 PageCache → 操作系统异步刷盘
风险点:
数据在 PageCache 未刷盘时,Broker 宕机
+ 选举了数据落后很多的 Follower 成为新 Leader
→ 落后的消息数据永久丢失
Kafka 不提供同步刷盘方式,依赖多分区多副本机制保障数据安全。
解决方案
# 禁止落后副本被选举为 Leader(防止数据落后的 Follower 成为 Leader)
unclean.leader.election.enable = false
# 副本数,建议 >= 3
replication.factor = 3
# 消息至少写入多少个 ISR 副本才算已提交,建议 > 1
min.insync.replicas = 2
# 关键约束:必须满足以下关系,否则一个副本 crash 整个分区不可用
replication.factor = min.insync.replicas + 1
# 示例:replication.factor=3, min.insync.replicas=2
五、Consumer 端丢失场景与解决方案
丢失场景
Consumer 消费流程:
Consumer → 获取 Leader Partition 元数据
→ Pull 消息
→ 业务逻辑处理
→ 提交 Offset 到 __consumer_offsets
两种提交顺序的风险:
方式1:先提交 Offset,再处理消息
→ 处理时 crash → 重启后从新 Offset 消费
→ 未处理的消息丢失 ❌(at most once)
方式2:先处理消息,再提交 Offset
→ 提交前 crash → 重启后重新拉取已处理消息
→ 消息重复消费 ⚠️(at least once,需幂等保障)
解决方案
# 关闭自动提交,改为手动提交
enable.auto.commit = false
正确消费流程:
拉取消息 → 业务逻辑处理完成 → 手动提交 Offset
消息重复消费问题由业务层保证幂等性解决(不是 Kafka 负责)。
六、三端丢失场景与解决方案总览
| 端 | 核心丢失原因 | 核心解决手段 |
|---|---|---|
| Producer | 异步发送无回调 + acks 配置不当 | 带回调发送 + acks=-1 + retries |
| Broker | PageCache 未刷盘 + 落后副本选举为 Leader | unclean.leader.election=false + 多副本配置 |
| Consumer | 先提交 Offset 再处理,处理中 crash | enable.auto.commit=false + 手动提交 + 业务幂等 |
优缺点与局限性
| 方案 | 优点 | 限制 / 踩坑点 |
|---|---|---|
| acks=all | 最强持久性保证 | ISR 只剩 Leader 时退化为 acks=1,仍可能丢数据 |
| enable.idempotence=true | 防止 Producer 重试导致重复 | 只解决单分区内幂等,跨分区需用事务 |
| replication.factor=3 | 允许 1 个副本宕机 | 设置与 min.insync.replicas 相等时,任意副本 crash 分区不可用 |
| unclean.leader.election=false | 防止数据丢失 | 所有 ISR 副本全挂时,分区变不可用,牺牲可用性换一致性 |
| 手动提交 Offset | 精确控制提交时机,不丢消息 | 先处理后提交存在重复消费,业务必须实现幂等 |
| retries=Integer.MAX_VALUE | 网络抖动时持续重试 | 配合 max.in.flight=1 保证有序,但会降低吞吐量 |
根本局限: Kafka 不能在任何情况下保证不丢数据,只能对已提交消息做最大限度持久化保证。极端情况(所有副本同时宕机、PageCache 未刷盘)下数据仍然会丢失。
行动清单
- 复现 Producer 丢数据场景:用 acks=0 发送消息,模拟网络中断,验证消息确实丢失且无重试
- 验证 acks=all 退化场景:将 ISR 缩减为只剩 Leader(kill Follower),观察 acks=all 是否退化为 acks=1
- 实现带回调的 Producer:改写现有
producer.send(msg)为producer.send(msg, callback),在 callback 中加入日志和重试逻辑 - 测试手动提交 Offset:设置
enable.auto.commit=false,分别测试处理前提交和处理后提交两种情况下 crash 的消息行为差异 - 参数组合验证:部署一个配置了
replication.factor=3, min.insync.replicas=2, unclean.leader.election=false的 Topic,依次 kill 1 个、2 个副本,观察分区可用性变化 - 幂等消费设计:梳理业务中消费 Kafka 消息的场景,对每个消费逻辑设计幂等方案(唯一键、数据库唯一约束或乐观锁)
Kafka 重复消费与漏消费问题解决方案
一句话摘要
Kafka 的重复消费和漏消费本质是偏移量提交时机问题;通过”先消费后提交偏移量”杜绝漏消费,通过幂等性设计(MySQL 唯一索引 或 Redis 判重)解决重复消费。
核心知识点
1. 推拉模式
Kafka 采用拉模式(Pull),Consumer 主动定时轮询 Broker 拉取消息。
| 模式 | 优点 | 缺点 |
|---|---|---|
| 推模式(Push) | 实时性高 | Consumer 消费速率跟不上时会积压、宕机 |
| 拉模式(Pull) | Consumer 可自控速率,系统更有柔性 | 存在一定消息延迟 |
2. 重复消费 & 漏消费的根本原因
两个关键参数:
enable.auto.commit # 是否自动提交偏移量,默认 true
auto.commit.interval.ms # 自动提交间隔,默认 5000ms(5秒)
offset 存储位置演进:
- Kafka
< 0.9:offset 存储在 ZooKeeper(高频通信场景下 ZK 有性能瓶颈) - Kafka
>= 0.9:offset 存储在内置 Topic__consumer_offsets
触发场景分析:
| 场景 | 提交方式 | 故障时机 | 结果 |
|---|---|---|---|
enable.auto.commit=true | 自动,每 5s 一次 | 业务处理完,但还没到自动提交时宕机 | 重复消费 |
enable.auto.commit=false 手动 | 先处理业务,再提交 | 处理完未提交就宕机 | 重复消费 |
enable.auto.commit=false 手动 | 先提交 offset,再处理业务 | 提交完未处理就宕机 | 漏消费 |
结论: 纯靠 Kafka 自身无法同时解决两者,需系统层配合。
3. 整体解决策略
分两步走:
- 先杜绝漏消费 → 通过参数 + 代码规范
- 再解决重复消费 → 通过幂等性设计
4. 杜绝漏消费
原则:无论自动/手动提交,代码中一律遵循「先处理业务逻辑,后提交 offset」的顺序。
enable.auto.commit=true(默认):本质是延迟提交,先处理业务再等待自动提交,可接受enable.auto.commit=false(手动):在业务代码处理完成后,再手动调用commitSync()/commitAsync()
这样做只会引入重复消费的风险,漏消费被彻底杜绝。
5. 解决重复消费:幂等性设计
核心思想: 同一条消息执行一次或多次,最终业务结果相同——依靠全局唯一 ID 实现判重。
方案一:MySQL 唯一索引判重
原理: 对 order_id(全局唯一 ID)建唯一索引,消费时执行 INSERT,若已存在则被唯一索引拦截,返回错误,不重复处理。
-- 示例:order_id 字段建唯一索引
CREATE UNIQUE INDEX idx_order_id ON your_table(order_id);
-- 消费时执行 insert,重复则抛异常,捕获处理即可
INSERT INTO your_table (order_id, ...) VALUES (?, ...);
优点: 开发成本低,仅加一个唯一索引 + 处理冲突异常。
缺点: 唯一索引不能使用 Change Buffer(写缓冲),导致 InnoDB 磁盘随机 IO 增加,数据库写入性能下降。
Change Buffer 原理补充: 对于非唯一索引,写操作若目标 Index Page 不在 Buffer Pool 中,可先缓存到 Change Buffer,由”页面被访问 / Master Thread 定期执行 / 数据库关闭”三个时机触发合并写入,减少随机 IO。唯一索引必须即时校验唯一性,无法延迟,因此不能用此优化。
方案二:Redis 唯一 Key + 过期时间判重
原理: 消费前用全局唯一 ID 到 Redis 中查找/写入 Key,Key 存在则跳过,不存在则处理并写入 Key。
步骤:
1. 从消息中提取全局唯一 ID
2. 执行 SET key value EX <过期时间> NX (原子操作)
3. 返回成功 → 首次消费,执行业务逻辑
4. 返回失败 → 重复消息,直接丢弃
为什么可以设置过期时间: 全局唯一 ID 通常含毫秒级时间序列,时间相隔较久的 ID 不会重复,可放心淘汰旧 Key 释放内存。
优点: 为数据库减压,将判重前置到 Redis,开发成本低。
缺点 / 踩坑点:
- Redis 挂掉时缺乏可靠的 Plan B:
- 容忍重复消费?→ 业务可能不允许
- 降级到数据库唯一索引判重?→ Redis 层的价值失效
- 改用
SELECT ... FOR UPDATE加锁判重?→ 操作更重,性能更差
- Plan B 方案强依赖具体业务特性,没有通用答案。
优缺点与局限性汇总
| 方案 | 适用场景 | 限制 / 踩坑点 |
|---|---|---|
| 先处理后提交 offset | 所有场景,基础规范 | 仍存在重复消费风险,需配合幂等性 |
| MySQL 唯一索引 | 写入量不大、对数据强一致性要求高 | 唯一索引禁用 Change Buffer,高并发写入时 IO 压力大 |
| Redis 判重 | 高并发、写入量大、对延迟敏感 | Redis 单点故障时 Plan B 复杂,需结合业务设计降级策略 |
行动清单
- 验证参数默认值:检查现有消费者配置中
enable.auto.commit和auto.commit.interval.ms的实际值,评估当前是否存在风险。 - 代码规范落地:在所有消费者代码中 Review 业务处理与 offset 提交的顺序,确保”先处理,后提交”。
- 验证 offset 存储位置:确认 Kafka 版本 >= 0.9,查看
__consumer_offsetsTopic 是否正常,避免对 ZooKeeper 的误判。 - 选择幂等方案:根据业务写入 QPS 决策——低频写入选 MySQL 唯一索引;高频写入优先选 Redis 判重,并提前设计 Redis 故障降级方案。
- 实践 Redis 原子判重:练习使用
SET key value EX <seconds> NX命令实现原子性写入,避免先 GET 再 SET 的竞态条件。 - 深入 Change Buffer:阅读 InnoDB Change Buffer 源码或官方文档,理解其触发合并的三个时机,从底层理解为何唯一索引不可用。
- 压测对比:在测试环境对 MySQL 唯一索引方案做高并发写入压测,量化随机 IO 对 TPS 的影响,为方案选型提供数据支撑。
Kafka 消费者:重复消费 / 顺序消费 / 延迟消费 / Rebalance 全解
一句话摘要
Kafka 消费端的核心难题是重复消费,根本原因是 offset 未能及时提交(由 Rebalance 或消费过慢触发);解决路径是提速 + 幂等兜底,同时合理配置 Rebalance 相关参数以降低触发频率。
二、核心知识点
1. 消费者消费流程
消费流程分四步:
- 从 ZooKeeper 获取目标 partition 的 leader 位置和 offset 位置。
- 直接从 Broker 的 PageCache 拉取数据(零拷贝,速度快)。
- 若 PageCache 数据不全,则从磁盘补充拉取。
- 消费完成后提交 offset(可手动或自动)。
2. 三种分区分配(Partition Assignment)策略
| 策略 | 原理 | 适用场景 |
|---|---|---|
| Range(默认) | 按分区数/消费者数均分,余数分摊给前几个消费者 | 通用默认 |
| RoundRobin | 对 topic 和 partition 的 hashcode 排序后轮询分配 | 多 topic 均衡消费 |
| Sticky | 无 Rebalance 时同轮询;发生 Rebalance 时尽量保持上一次分配不变 | 减少 Rebalance 开销 |
计算示例(Range):12 个 partition,9 个消费者 → 12/9=1,12%9=3,前 3 台各消费 2 个,后 6 台各消费 1 个。
配置代码:
// Range(默认)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class);
// RoundRobin
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class);
// Sticky
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class);
3. 零拷贝(Zero Copy)
普通文件传输需 4 次拷贝:磁盘 → 内核缓冲区 → 用户空间 → Socket Buffer → 网卡,其中内核↔用户空间的两次互转带来额外 CPU 上下文切换开销。
零拷贝跳过用户空间,通过 DMA(Direct Memory Access) 直接将数据从内核传递到网卡,减少不必要的拷贝次数。
实现方式:
- Linux:
sendfile()系统调用 - Java NIO:
FileChannel.transferTo() - mmap 文件映射:将磁盘文件映射到内存,用户通过修改内存直接修改磁盘文件,提高 I/O 效率。
4. 重复消费的原因与解决方案
原因一:生产者重复提交(producer 重试机制导致同一消息发送多次)
原因二:Rebalance 引起重复消费
超过 max.poll.interval.ms(默认 5 分钟)未 poll,客户端主动离开消费组触发 Rebalance,offset 提交失败,其他消费者从上次已提交的 offset 处重新消费。
解决方案:
① 提高消费速度(治标)
- 增加消费者数量(不超过 partition 数)
- 多线程并行消费
- 异步消费
- 优化消费处理逻辑,缩短单条耗时
② 幂等处理(治本)
- 消费者侧做幂等校验(推荐):消费前检查该消息是否已处理(如查 Redis/DB)
- 开启 Kafka 生产者幂等:将消息生成 MD5,存入 Redis,消费时先校验(性能消耗较大,不建议开启)
// 开启生产者幂等(谨慎使用)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
5. 顺序消费
Kafka 保证单个 partition 内消息有序,跨 partition 不保证顺序。实现顺序消费的核心是:将需要保序的消息发送到同一个 partition(通过指定相同的 key)。
6. 延迟消费
Kafka 本身无状态,原生不支持延迟消费(RabbitMQ、Pulsar 原生支持更方便)。
实现方案:开发延迟推送服务,定时扫描延迟消息,到时间后再投递给 Kafka。
7. 频繁 Rebalance 的解决方案
触发条件:
- consumer 数量变化
- 订阅的 topic 数量变化
- 订阅的 topic 的 partition 数量变化
Rebalance 期间所有消费者停止消费,直到均衡完成,对业务影响大。
解决方案:
① 参数调整
session.timeout.ms:v0.10.2 之前版本适当提高,需大于一批数据的消费时间,不超过 30s(建议设 25s);v0.10.2 及之后保持默认。max.poll.interval.ms:设置为大于实际消费一批数据所需时间的值,避免消费超时触发 Rebalance。
② 提高消费速度:消费逻辑另起线程异步处理,poll 主循环快速返回。
③ 减少 Group 订阅 Topic 数量:一个 Group 订阅的 Topic 最好不超过 5 个,建议 1 个 Group 只订阅 1 个 Topic。
8. 批量消费配置(Spring Kafka)
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
factory.setConcurrency(3); // 并发消费者数量
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true); // 开启批量消费
return factory;
}
关键消费者配置参数:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关闭自动提交,手动控制
props.put(ConsumerConfig.GROUP_ID_CONFIG, ...);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每批最大拉取条数
三、优缺点与局限性
Range 策略
- 适用:分区数能被消费者数整除时分配均匀;否则前几个消费者会多消费 1 个,多 topic 场景下头部消费者负担明显更重。
Sticky 策略
- 优点:Rebalance 后尽量复用原有分配,减少迁移开销。
- 缺点:实现复杂,首次分配与 RoundRobin 相同,优势只在 Rebalance 时体现。
生产者幂等(ENABLE_IDEMPOTENCE_CONFIG=true)
- 只能解决单 producer 同 session 内的重复,跨重启无效。
- 性能开销较大,原文明确建议”尽量不要开启”。
消费者侧幂等(Redis/DB 去重)
- 最通用的解法,但需要额外存储,并发场景要注意分布式锁或原子操作。
延迟消费
- Kafka 原生不支持,自研轮询服务存在精度问题(取决于轮询间隔),高精度延迟推荐用 RabbitMQ 或 Pulsar。
批量消费
MAX_POLL_RECORDS_CONFIG设置过大,单批处理时间超过max.poll.interval.ms则触发 Rebalance,需根据业务消费耗时反向估算合理批大小。
四、行动清单
- 实验 Rebalance 参数:在测试环境故意让消费者超时,观察
max.poll.interval.ms不同值对 Rebalance 触发频率的影响。 - 实现消费幂等:用 Redis
SET NX(或SETNX)存储消息唯一 ID,消费前校验,保证 at-most-once 处理。 - 压测批量消费:调整
MAX_POLL_RECORDS_CONFIG(如 100 / 500 / 1000),对比吞吐量和消费延迟,找到业务的最优批大小。 - 顺序消费实践:用相同
key将同一业务实体的消息固定路由到同一 partition,验证有序性。 - 对比分区策略:在多 topic 场景下,切换 Range → RoundRobin,观察各消费者负载是否更均衡。
- 延伸阅读:Pulsar 的延迟消息实现原理,对比 Kafka 自研轮询方案的精度和复杂度差距。
- 踩坑清单核查:排查现有项目中是否存在
ENABLE_AUTO_COMMIT_CONFIG=true+ 业务处理耗时长的组合,这是线上最常见的重复消费根因。
Kafka 集群消息积压问题及处理策略
一句话摘要
Kafka 消息积压由三类根因引发(消费任务挂掉、分区数不足、key 分布不均),针对每类根因有对应的恢复策略和预防手段。
核心知识点
1. 正常不积压的前提条件
满足以下全部条件时,Kafka 通常不会产生积压:
- Producer 采用轮询或随机方式生产,保证各分区数据均匀分布。
- 根据 topic 数据量合理设计分区数。
- 消费端(Spark Streaming / Structured Streaming / Flink)持续稳定运行,不存在长时间中断。
上述任一条件被破坏,积压就不可避免。
2. 积压根因一:消费任务挂掉
问题描述:实时消费应用因异常退出,同时缺少监控告警和自动重启机制,导致积压在任务挂掉到重新启动之间持续累积。
解决方案 A(跳过积压,补漏):
- 重启后直接从最新 offset 消费,优先恢复实时链路。
- 对”滞后”的历史积压数据,启动离线补漏程序单独处理。
解决方案 B(从断点续消):
- 任务启动时从上次已提交的 offset 处继续消费。
- 若积压量大,需同步增加计算资源,尽快追上最新消息。
配套工程实践:
- 将任务接入监控体系,出现异常及时通知负责人。
- 编写自动重启脚本,确保任务可被自动拉起。
- 实时框架需具备健壮的异常处理能力,避免脏数据导致任务无法重启。
3. 积压根因二:分区数过少 / 消费能力不足
问题描述:Kafka 分区数是并行度调优的最小单元。分区数过少时,consumer 无法水平扩展,吞吐量上限受限。单分区生产 QPS 通常很高,一旦业务逻辑复杂导致消费耗时增加,积压就会出现。
解决方案:
- 直接增加 Kafka 分区数,提升并行消费能力。
- 若使用 Spark Streaming + Kafka Direct Approach 模式,可在 Spark 侧对
KafkaRDD执行repartition重分区,增加并行处理度,无需修改 Kafka 配置。
4. 积压根因三:消息 key 不均匀导致分区数据倾斜
问题描述:Kafka producer 指定了消息 key 时,相同 key 的消息会路由到同一分区。若 key 分布不均匀,部分分区数据量远超其他分区,形成热点,导致局部积压。
解决方案:
- 在 Kafka producer 端,对 key 追加随机后缀,使消息均匀打散到各分区。
优缺点与局限性
| 场景 | 方案 | 适用条件 | 限制 / 踩坑点 |
|---|---|---|---|
| 任务挂掉-跳过积压 | 重启消费最新消息 + 离线补漏 | 实时链路恢复优先,可接受一定处理延迟 | 需要单独开发离线补漏程序,复杂度增加 |
| 任务挂掉-断点续消 | 从上次 offset 继续消费 | 对数据完整性要求高、不允许丢失 | 积压量大时追消费速度慢,需扩容资源才能追上 |
| 分区数不足 | 增加 Kafka 分区数 | 数据量持续增大的长期方案 | 分区数只能增加不能减少;修改后需要重新平衡消费者分配 |
| Spark Direct 模式 | KafkaRDD.repartition | 不想修改 Kafka 端配置时的临时方案 | repartition 会引入 shuffle,有额外性能开销 |
| key 不均匀 | key 加随机后缀 | producer 侧可改造 | 加随机后缀后,相同业务 key 的消息会分散到不同分区,破坏顺序消费语义,需评估业务是否依赖消息顺序 |
行动清单
- 排查现有消费任务是否有监控:确认 Spark Streaming / Flink 任务已接入告警系统,并配置自动重启策略(如 Supervisor / Kubernetes restartPolicy)。
- 评估当前 Kafka 分区数是否合理:根据 producer QPS 和 consumer 单分区处理能力计算最优分区数,公式参考:
分区数 = max(producer峰值QPS / 单分区写入上限, consumer并发数)。 - 检查 producer key 的分布情况:通过 Kafka 自带命令查看各分区消息积压量(
kafka-consumer-groups.sh --describe),若存在严重倾斜则排查 key 设计。 - 制定积压应急预案:明确业务是选择”实时优先+离线补漏”还是”断点续消+扩容追赶”,提前编写对应脚本。
- Spark Direct 模式下测试 repartition 效果:在测试环境验证
KafkaRDD.repartition(n)的 shuffle 开销是否在可接受范围内,再推广到生产。 - 延伸学习:深入研究 Kafka Consumer Group Rebalance 机制,理解分区再分配对积压的影响,以及如何通过
max.poll.records、fetch.min.bytes等参数调优消费吞吐量。
⚠️ 注意:该 CSDN 文章为 VIP 付费内容,正文在”具体操作2”处被截断,目录也仅展示了”场景1 + 解决方案1”。以下笔记仅基于页面实际可读内容整理,未能覆盖文章完整内容。
Kafka 消息积压:典型场景及解决方案
一句话摘要
Kafka 消息积压的核心矛盾是生产速度 > 消费速度,本文针对”消费任务挂掉”这一典型场景,给出了”跳过历史直接消费最新 + 离线补漏”的组合解法。
核心知识点
1. 消息积压的定义
Kafka 消息积压(Message Backlog)指 Broker 中存在大量尚未被消费的消息。根本原因是生产者写入速度持续超过消费者消费速度,或消费者出现故障停止消费。
2. 场景一:消费/实时任务挂掉
问题描述:实时消费应用异常退出,且无监控告警、无自动重启脚本,导致任务挂掉期间的消息全部堆积。重启应用后,若历史积压量过大,无法简单重启直接追消完。
解决思路:将”追历史积压”与”消费最新消息”拆成两条独立的处理链路,互不干扰。
操作一:让消费者跳过积压,直接从最新消息开始消费
有以下四种方式,任选其一:
方式 1:配置参数
auto.offset.reset = latest
消费者启动时若无已提交偏移量,则从最新位置开始消费。
方式 2:动态消费组 ID
每次重启消费者时生成一个随机 groupId,使 Kafka 视其为全新消费组,从而自动从 latest 开始消费:
// 示例:Java 中动态设置 group.id
props.put("group.id", "realtime-consumer-" + UUID.randomUUID().toString());
方式 3:手动重置偏移量到最新位置
通过 Kafka Consumer API 编程方式将偏移量设置到分区末尾:
consumer.seekToEnd(consumer.assignment());
方式 4:偏移量默认存储位置
Kafka 较早版本偏移量默认存储在 ZooKeeper,新版本存储在 Kafka 内部 Topic __consumer_offsets,手动重置时需注意版本差异。
操作二:对滞后历史数据进行离线补漏
(原文此部分被付费墙截断,未能获取具体内容)
核心思路是通过离线批处理程序,按时间范围重放积压期间的消息,与实时链路相互独立,防止历史数据影响实时消费链路的延迟。
优缺点与局限性
| 方案 | 适用场景 | 局限 / 踩坑点 |
|---|---|---|
auto.offset.reset=latest | 新部署或允许丢弃历史数据的场景 | 一旦历史数据有业务价值,直接跳过会造成数据丢失,必须配合离线补漏 |
| 动态 groupId | 需要每次重启都从 latest 开始的场景 | 动态 groupId 会导致 Kafka 永远不会清理旧 group 的偏移量元数据,积累垃圾 group;生产慎用 |
seekToEnd 手动设置 | 精细化控制偏移量的场景 | 需要保证调用时机正确(assign 之后),否则 assignment 为空会静默失败 |
| 离线补漏 | 历史数据量大、业务上允许延迟处理的场景 | 需要额外维护一套离线消费链路,增加系统复杂度;要做好幂等处理,防止与实时链路重复消费同一条消息 |
行动清单
- 搭建监控:优先为 Kafka Consumer Lag 指标配置告警,推荐使用 Kafka Exporter + Prometheus + Grafana,当
consumer_lag > 阈值时触发告警,防止积压无人知晓。 - 配置自动拉起:用 Supervisor、Systemd 或 K8s Deployment 管理消费任务进程,确保任务崩溃后自动重启。
- 实践偏移量控制:本地搭建 Kafka 环境,分别验证
auto.offset.reset=earliest/latest、seekToBeginning、seekToEnd三种偏移量控制方式的行为差异。 - 阅读补充资料:该文章付费内容未能读取,可参考以下同类公开资源补充其他积压场景(消费速度慢、分区数不足、下游处理瓶颈):
- Kafka 官方文档:Consumer Configuration 章节
kafka-consumer-groups.sh --describe命令:用于查看 Lag 详情- 增加 Topic Partition 数 + 同步扩容 Consumer 数量(Consumer 数 = Partition 数时并行度最优)
- 设计离线补漏方案:制定补漏方案时,需明确幂等消费策略(如消息 ID 去重),避免实时和离线链路双写冲突。
Kafka 高可用、顺序消费及幂等性
一句话摘要
Kafka 集群在生产环境中面临消息丢失、重复消费、乱序、积压四大核心问题,通过副本机制、ISR 选举、幂等性方案和分区策略可系统性解决。
二、核心知识点
1. 搭建 Kafka 集群(3-Broker 示例)
每个 Broker 通过唯一 broker.id 和独立监听端口区分身份,启动后向 ZooKeeper 注册临时节点。
server.properties 核心配置示例:
# Broker 3 示例
broker.id = 3
listeners=PLAINTEXT://172.16.30.34:49094
验证集群启动成功:
# 进入 ZooKeeper 客户端
./bin/zkCli.sh
# 查看已注册的 Broker 节点
ls /brokers/ids
# 输出:[1, 2, 3] 说明 3 个节点全部在线
向集群发送和消费消息:
# 生产者发送消息(指定多个 Broker 地址)
./kafka-console-producer.sh \
--broker-list 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
--topic my-replicated-topic
# 消费者消费消息
./kafka-console-consumer.sh \
--bootstrap-server 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
--topic my-replicated-topic
# 指定消费组消费(从头开始)
./kafka-console-consumer.sh \
--bootstrap-server 172.16.30.34:49092,172.16.30.34:49093,172.16.30.34:49094 \
--from-beginning \
--consumer-property group.id=testGroup1 \
--topic my-replicated-topic
2. 集群关键角色
Controller(控制器)
- 每个 Broker 启动时在 ZooKeeper 创建临时序号节点,序号最小的 Broker 成为 Controller。
- 职责:负责 Leader 选举、Broker 增减时的元数据同步、分区变更时的信息广播。
Leader 选举规则(ISR 机制)
- ISR(In-Sync Replica)集合维护与 Leader 保持同步的副本列表。
- 选举时取 ISR 集合最左边的元素作为新 Leader。
- 示例:ISR =
[2, 1, 3],Leader(2) 宕机 → ISR 变为[1, 3]→ Broker-1 上的副本成为新 Leader。
HW 与 LEO
| 指标 | 含义 | 规则 |
|---|---|---|
| LEO(Log End Offset) | 某副本最后一条消息的位移 | 各副本独立维护 |
| HW(High Watermark,高水位) | Leader-Follower 完成同步的位置 | 消费者只能消费 HW 之前的消息;HW ≤ LEO(木桶效应) |
3. Rebalance 机制
触发条件: 消费组中消费者未指定分区 + 消费者数量或分区关系发生变化。
三种分配策略(可配置):
- range:前段消费者分得
分区总数/消费者数量 + 1个,后段分得分区总数/消费者数量个。 - 轮询(round-robin):各消费者轮流分配分区。
- sticky(粘合策略):Rebalance 时在原有分配基础上最小化调整,不重置已有分配关系。
4. 防止消息丢失
生产者侧:
- 使用同步发送(非 fire-and-forget)。
ack = 1(全同步)。- 同步副本数 ≥ 2。
消费者侧:
- 关闭自动提交 offset,改为手动提交。
- 消息完整消费并处理完毕后,再执行 ack 应答。
5. 防止重复消费
根因: 网络抖动导致生产者未收到 ack,触发重试,消费者收到两条相同消息。
不推荐方案: 生产者关闭重试机制(会引发消息丢失)。
推荐方案:消费者侧幂等性保证
- 方案 A:数据库唯一索引(消息 ID 作为唯一键,重复插入直接忽略)。
- 方案 B:Redis 分布式锁(消费前先
SET key NX,已存在则跳过处理)。
6. 顺序消费
约束条件(严格限制):
- 生产者:同步发送,
ack = 1。 - 消费者:Topic 只设置 1 个 Partition,消费组内只有 1 个消费者。
顺序消费直接牺牲水平扩展能力,吞吐量极低,仅在强顺序业务场景(如订单状态流转)下使用。
7. 解决消息积压
成因: 消费速度持续低于生产速度,offset 寻址开销随积压量增长,最终引发服务雪崩。
解决方案:
- 消费者内部启用多线程并发消费,充分利用单机 CPU。
- 在同一消费组中横向扩展消费者实例,部署到多台机器并行消费(消费者数量 ≤ 分区数)。
8. 延时队列实现
典型场景: 未支付订单 30 分钟后自动取消。
实现方案:
- 创建专用 Topic,生产者发送消息时携带创建时间戳。
- 消费者消费时判断:
当前时间 - 创建时间 ≥ 30min?- 是 → 修改订单状态为「超时取消」。
- 否 → 记录当前 offset,等待 1 分钟,再重新拉取该 offset 的消息继续判断,循环直到超时或支付完成。
三、优缺点与局限性
| 场景 | 优势 | 限制 / 踩坑点 |
|---|---|---|
| 高可用副本 | 节点宕机自动切换 Leader,无需人工干预 | ISR 列表为空时(全副本落后),选举可能选到脏数据副本 |
| 幂等性去重 | 消费者侧控制,无需改动生产者逻辑 | Redis 分布式锁存在锁过期后重复消费的边界问题 |
| 顺序消费 | 严格保证全局有序 | 单分区单消费者,吞吐量近乎为零,不可用于高并发场景 |
| Rebalance sticky | 减少分区迁移开销 | 需显式配置,默认策略为 range,老集群升级需注意兼容性 |
| 延时队列轮询方案 | 无需引入额外中间件(如 RabbitMQ 死信队列) | 1 分钟轮询粒度不够精细;高频轮询对 Kafka 造成额外读压力 |
| 消息积压扩容 | 水平扩展简单有效 | 消费者数量超过分区数后,多余消费者闲置,扩容前需同步扩分区 |
四、行动清单
- 环境实践:
- 本地用 Docker 启动 3-Broker Kafka 集群,执行文中
zkCli.sh命令验证节点注册。 - 创建一个
replication-factor=3的 Topic,手动 kill 掉 Leader Broker,观察 ISR 切换过程。
- 本地用 Docker 启动 3-Broker Kafka 集群,执行文中
- 代码实践:
- 用 Spring Kafka 实现消费者手动提交 offset,对比自动提交与手动提交在消费失败时的行为差异。
- 实现基于 Redis
SET NX的幂等性消费拦截器,模拟网络重试场景验证去重效果。 - 实现延时队列 Demo:下单时发消息带时间戳,消费者轮询判断是否超时取消。
- 深入学习:
- 深入研究 HW/LEO 同步流程(Leader Epoch 机制),理解 HW 截断如何防止数据不一致。
- 对比 Kafka 延时队列方案 vs RabbitMQ 死信队列 vs 时间轮(HashedWheelTimer)的适用场景。
- 研究
sticky分配策略的具体算法,以及在大规模消费组(50+ 消费者)下的性能表现。
注: 该页面正文后半部分为付费内容,以下笔记基于页面已公开的完整核心内容(回答重点 + 概述)整理。
Kafka 批量消息发送与消费及性能优化
一句话摘要
通过配置 Producer 的 linger.ms / batch.size 和 Consumer 的 max.poll.records,Kafka 可将多条消息合并批次收发,显著降低网络往返和 I/O 开销,是提升系统吞吐量的核心手段。
核心知识点
1. 批量消息发送
原理: Producer 不会每次调用 send() 就立即发网络包,而是将发往同一分区的消息暂存到内存缓冲区(RecordAccumulator),满足任一条件时触发真正发送:
- 缓冲区大小达到
batch.size - 等待时间超过
linger.ms
关键参数:
| 参数 | 作用 | 默认值 |
|---|---|---|
batch.size | 每个批次的最大字节数,达到后立即发送 | 16384(16KB) |
linger.ms | 批次未满时最长等待时间(ms),增大可提升吞吐、轻微增加延迟 | 0 |
acks | 确认模式,all 表示所有副本确认,可靠性最高 | 1 |
示例代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 5); // 等待5ms凑批
props.put("batch.size", 16384); // 批次上限16KB
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
2. 批量消息消费
原理: Consumer 调用 poll() 时,一次性从 Broker 拉取多条消息,max.poll.records 限制单次拉取的最大条数,避免单次处理量过大。
关键参数:
| 参数 | 作用 | 示例值 |
|---|---|---|
max.poll.records | 每次 poll() 返回的最大消息条数 | 500 |
enable.auto.commit | 是否自动提交 offset,批量场景建议关闭改为手动提交 | false |
示例代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false"); // 禁用自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.records", 500); // 每次最多拉500条
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 处理完整批次后手动同步提交
}
3. 批量操作的性能优化方向
批量处理的收益来自减少以下开销:
- 网络往返(RTT):N 条消息合并为 1 次网络请求
- I/O 操作:Broker 顺序写磁盘效率更高
- 系统调用次数:减少 send/recv syscall 频率
常见调优参数组合:
- Producer 高吞吐:
linger.ms=5~20,batch.size=65536(64KB),compression.type=lz4/snappy - Consumer 高吞吐:
max.poll.records=500~1000,fetch.min.bytes适当增大,fetch.max.wait.ms配合调整
优缺点与局限性
批量发送(Producer)
- 适用场景:日志收集、埋点上报、非实时业务数据写入
- 局限性:
linger.ms增大会引入固定延迟,实时性敏感业务(如支付、报警)不适合 - 踩坑点:
batch.size设置过小等于没有批量效果;消息体超过batch.size时单条直接发送,不受批量约束
批量消费(Consumer)
- 适用场景:数据库批量写入、离线分析、ETL 处理
- 局限性:
max.poll.records设置过大,若单批处理时间超过max.poll.interval.ms(默认5分钟),Consumer 会被踢出消费组,触发 Rebalance - 踩坑点1:使用
enable.auto.commit=true时,批次中部分消息处理失败但 offset 已提交,会造成消息丢失 - 踩坑点2:
commitSync()放在for循环内部(逐条提交)等于失去批量提交的意义,应放在整个批次处理完成后
通用局限性
- 批量大小与延迟是互相制约的 trade-off,没有通用最优值,需根据业务 SLA 测压调整
- 消费端批量处理失败时,重试整个批次还是跳过错误消息,需要明确业务策略
行动清单
- 动手验证参数效果: 本地搭建单节点 Kafka,用示例代码分别测试
linger.ms=0vslinger.ms=10,用 JMX 或kafka-producer-perf-test.sh对比吞吐量差异 - 压测 Consumer 批量大小: 将
max.poll.records从 50 逐步调到 1000,观察处理延迟与吞吐量变化,找到业务场景的平衡点 - 手动 commit 实践: 将示例消费代码改为批次内部处理异常时不提交 offset,验证消息不丢失的语义
- 深入参数体系: 继续研究与批量密切相关的参数:
buffer.memory(Producer 缓冲区总大小)、fetch.min.bytes/fetch.max.wait.ms(Consumer 拉取策略)、compression.type(批量压缩) - 面试答题框架: 回答此类题目时按”实现手段(参数)→ 代码示例 → 性能收益原理 → 踩坑/局限”四段组织,覆盖 Producer 和 Consumer 两侧