当前位置: 首页 > news >正文

如何防止 Kafka 消息在提交过程中丢失?Spring Boot 实战指南

视频看了几百小时还迷糊?关注我,几分钟让你秒懂!


一、问题背景:消息为什么会“丢”?

很多开发者以为 Kafka “天生可靠”,但消息丢失往往发生在“消费端提交偏移量”的环节
即使 Kafka 本身持久化了消息,如果你的消费者提前提交了 offset,而业务逻辑还没执行完,一旦应用崩溃——这条消息就永远消失了

🎯 典型场景:

  • 用户下单成功,Kafka 发送“订单创建”事件
  • 消费者收到消息,准备扣库存
  • offset 被提前提交
  • 扣库存前服务宕机 →订单已确认,但库存没扣!

这不是 Kafka 的锅,而是提交策略不当导致的!


二、根本原则:先处理业务,再提交 offset

正确顺序

1. 拉取消息 2. 执行业务逻辑(如写 DB、调接口) 3. 业务成功 → 提交 offset

错误顺序(自动提交默认行为)

1. 拉取消息 2. 后台线程定时提交 offset(不管业务是否完成) 3. 业务执行中崩溃 → 消息丢失

三、解决方案:关闭自动提交 + 手动 ACK + 幂等消费

下面我们用Spring Boot + Kafka实现一个不丢消息的消费者。


✅ 步骤 1:关闭自动提交(关键!)

# application.yml spring: kafka: consumer: enable-auto-commit: false # 必须关闭! group-id: order-service key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-desserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.dto"

⚠️enable-auto-commit: false是防止消息丢失的第一道防线!


✅ 步骤 2:配置手动 ACK 模式

// KafkaConfig.java @Configuration @EnableKafka public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory( ConsumerFactory<String, OrderEvent> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); // 设置为手动立即确认 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }

✅ 步骤 3:消费者代码:业务成功才 ACK

// OrderConsumer.java @Component public class OrderConsumer { @Autowired private InventoryService inventoryService; @KafkaListener(topics = "order-created", groupId = "order-service") public void consume(OrderEvent event, Acknowledgment ack) { try { // 1. 业务处理:扣减库存 inventoryService.decreaseStock(event.getProductId(), event.getQuantity()); // 2. 业务成功 → 手动提交 offset ack.acknowledge(); log.info("订单 {} 处理成功,offset 已提交", event.getOrderId()); } catch (Exception e) { // 3. 业务失败 → 不提交 offset! log.error("处理订单失败,不提交 offset,等待重试", e); // 可选:记录到死信队列,避免无限重试 } } }

✅ 这样:只有库存扣减成功,offset 才会提交。如果失败,下次重启还会重新消费同一条消息。


四、进阶保障:幂等性设计(防重复消费)

由于我们采用“失败不提交 offset”,消息可能会被重复消费。因此,消费者必须幂等

🔧 幂等实现方式:

方式 1:数据库唯一索引(推荐)
CREATE TABLE processed_messages ( message_id VARCHAR(64) PRIMARY KEY, -- Kafka 消息的 key 或生成的 UUID processed_at TIMESTAMP );

消费时:

if (!messageLogService.exists(event.getMessageId())) { // 执行业务 inventoryService.decreaseStock(...); // 记录已处理 messageLogService.save(event.getMessageId()); ack.acknowledge(); }
方式 2:业务状态机
  • 订单状态:CREATEDSTOCK_DEDUCTED
  • 如果已经是STOCK_DEDUCTED,直接跳过

五、反例对比:自动提交 vs 手动提交

场景自动提交(enable-auto-commit=true)手动提交(enable-auto-commit=false)
消费消息后处理耗时 10 秒5 秒时自动提交 offset不提交,直到ack.acknowledge()
处理到第 8 秒时服务崩溃消息丢失(offset 已提交)消息保留(offset 未提交,重启后重试)
适合场景日志、监控等可容忍丢失的数据订单、支付、通知等关键业务

六、其他注意事项

1.不要在 ACK 后做关键操作

// ❌ 危险!ACK 后再操作,崩溃会导致数据不一致 ack.acknowledge(); inventoryService.decreaseStock(...); // 崩溃 → offset 提交了,但库存没扣!

✅ 正确顺序:业务 → ACK


2.避免长时间阻塞消费者线程

  • 如果业务耗时很长(如调外部 API),考虑异步处理 + 本地事务表
  • 或使用@RetryableTopic+ 死信队列(DLQ)机制

3.测试你的容错能力

  • kill -9模拟非优雅关闭
  • 观察消息是否重试、是否重复、是否丢失

七、总结

要防止 Kafka 消息在提交过程中丢失,请牢记:

  1. 关闭自动提交enable-auto-commit: false
  2. 业务成功后再手动 ACK
  3. 消费者必须幂等(防重复)
  4. ACK 前不要做任何可能失败的关键操作

这样,即使服务崩溃、网络中断、机器宕机,你的消息也不会丢失!

视频看了几百小时还迷糊?关注我,几分钟让你秒懂!

http://www.cnnetsun.cn/news/4710.html

相关文章:

  • Wan2.2-T2V-A14B在快递物流场景模拟中的流程可视化应用
  • 如何让群晖 DS918+ 实现人脸识别?Synology Photos 完整补丁教程
  • 微生物群落数据分析不再难:microeco快速上手指南
  • SpringBoot+Vue 医院病历管理系统管理平台源码【适合毕设/课设/学习】Java+MySQL
  • STL文件缩略图生成神器:让3D模型管理效率翻倍
  • Armbian系统在RK3568开发板上的深度适配实战指南
  • 流式响应Token统计革命:从“黑盒“到“透明化“的技术突破
  • 慧荣U盘量产工具v20.02.04.21完整使用教程:从入门到精通
  • 虚拟展厅制作公司怎么选?5家行业标杆整理
  • 12月远程控制推荐:免费不限时长,可连接海外的向日葵远程控制
  • 10款AI降重工具精选:轻松通过AIGC检测的专业方案
  • (最新2025实测红黑榜!)10款免费降ai率工具
  • 别卷运维了!护网蓝队日薪 2700,3 个月从日志分析到应急响应,转行即高薪!
  • 科研人必看!备战2026国自然科学基金申报前的准备工作
  • Wan2.2-T2V-A14B如何应对极端天气条件下的场景生成?
  • 【顶尖团队都在用】C++编译防火墙的4层隔离架构揭秘
  • 揭秘ASP.NET Core 9 WebSocket压缩机制:如何提升通信效率300%?
  • AxGlyph终极免费版:简单快速的矢量图绘制神器
  • SpringBoot3整合Sa-Token权限认证实战
  • 虚幻基础:UI
  • 硬核盘点 2025 低代码平台:TOP20 技术架构 + 实战案例(上)
  • 【企业级应用开发突围】:低代码平台中PHP组件化落地的8个关键步骤
  • 5天掌握VESC Tool:从电机控制新手到高手的完整指南
  • 颠覆传统!用nodeppt Mermaid插件打造动态图表演示新体验
  • 从零构建安全支付系统:PHP非对称加密完整实现路径
  • NEMU系统模拟器使用全攻略:从环境搭建到性能调优的10个关键技巧
  • 两大Linux发行版ZorinOS与AnduinOS,哪个更好用?
  • Centos 7 虚拟机磁盘扩容
  • 生物信息分析师不愿透露的R语言技巧:甲基化数据质量控制与标准化处理(仅此一篇)
  • AIP7533/25/30/36/40/44/50_200mA/30V低压差线性稳压器 功能框架