.NET 项目要不要上 Kafka?看完这篇再决定
在典型的 .NET 微服务架构中,同步 HTTP 调用链会引入直接耦合:下游服务超时即阻塞上游,失败重试可能导致雪崩。更隐蔽的是,消息一旦被消费,默认行为下 RabbitMQ 等队列会将其删除(即使开启持久化,也缺乏 Kafka 的 Offset 回溯能力)——这意味着你无法在 Bug 修复后重放历史事件,也无法在消费者故障时无损恢复。
Kafka 通过持久化的提交日志(Commit Log)模型彻底解决了这两个问题:消息写入即落盘,消费者只移动游标(Offset)而不删除数据;同时利用分区(Partition)和批量传输,在合理硬件(如 SSD、万兆网)下可实现单机数十万级 TPS(生产者写入)。本文从 .NET 开发者的视角,讲清它的核心机制、接入成本和选型边界,不做过度简化,也不堆砌无关细节。
一、Kafka 是什么
一句话:Kafka 是一个分布式、高吞吐、能持久化存储消息的消息队列。
它和 RabbitMQ 的主要区别:
- RabbitMQ:默认消费即删除(即使开启持久化队列,也缺乏 Kafka 的 Offset 回溯能力)。适合可靠路由、任务分发。
- Kafka:像银行流水账单——每笔交易都永久记在账本里,消费者只移动指针,随时可以重放。适合海量日志、历史回溯、流处理。
Kafka 的核心原理可以忽略很多细节,你只需要记住三个词:
| 概念 | 通俗解释 |
|---|---|
| Topic(主题) | 类似数据库里的“表”,消息按主题分类 |
| Partition(分区) | Topic 被切成若干份,可以并行读写,所以快 |
| Consumer Group(消费组) | 多个消费者分担读取任务,互不重复 |
官网:https://kafka.apache.orghttps://kafka.apache.org/
GitHub:https://github.com/apache/kafkahttps://github.com/apache/kafka
开源协议:Apache 2.0,可免费商用。收费版是 Confluent 公司提供的企业增强版,普通团队用开源版足够。
二、怎么引入
安装 NuGet 包
dotnet add package Confluent.Kafka
当前稳定版请以 NuGet 官网为准(如 2.6.x 或更高),支持 .NET 6/7/8。
Program.cs 最小配置(必须项 + 正确资源释放)
// Program.cs using Confluent.Kafka; var builder = WebApplication.CreateBuilder(args); // 注册生产者(Singleton 生命周期) builder.Services.AddSingleton<IProducer<Null, string>>(provider => { var config = new ProducerConfig { BootstrapServers = "localhost:9092", Acks = Acks.All, // 等待所有副本确认,防丢消息 EnableIdempotence = true // 开启幂等,防重复 }; return new ProducerBuilder<Null, string>(config).Build(); }); var app = builder.Build(); // 应用退出前 Flush 并释放 Producer,防止缓冲区消息丢失 var lifetime = app.Services.GetRequiredService<IHostApplicationLifetime>(); lifetime.ApplicationStopping.Register(() => { var producer = app.Services.GetRequiredService<IProducer<Null, string>>(); producer.Flush(TimeSpan.FromSeconds(10)); producer.Dispose(); }); app.Run();验证接入是否成功
using Confluent.Kafka; var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using var producer = new ProducerBuilder<Null, string>(config).Build(); var result = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = "Hello Kafka" }); Console.WriteLine($"Delivered to {result.TopicPartitionOffset}"); 若输出类似 Delivered to test-topic[0]@1 则成功。三、如何部署 Kafka(简单版)
开发环境推荐 Docker(快速、隔离、与生产一致)
正确配置(KRaft 模式,让宿主机可访问):
docker run -it --rm --name kafka \ -p 9092:9092 \ -e KAFKA_NODE_ID=1 \ -e KAFKA_PROCESS_ROLES=broker,controller \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ apache/kafka:4.0.0或者使用更省心的 bitnami/kafka(推荐新手):
端口说明:9092:Kafka 通信端口(客户端连接用)
# 单容器模式,localhost 指容器内部自身,勿改为宿主机 IP docker run -it --rm --name kafka \ -p 9092:9092 \ -e KAFKA_CFG_NODE_ID=0 \ -e KAFKA_CFG_PROCESS_ROLES=controller,broker \ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ bitnami/kafka:latestWindows 本地安装(无 Docker 环境)
- 安装 Java(必须,Kafka 依赖 JVM)
下载地址:Latest Releases | AdoptiumEclipse Temurin offers high-performance, cross-platform, open-source Java runtime binaries that are enterprise-ready and Java SE TCK-tested for general use in the Java ecosystem.https://adoptium.net/temurin/releases
安装后命令行执行java -version验证。 - 下载 Kafka下载地址:https://kafka.apache.org/community/downloads/
https://kafka.apache.org/downloads
- 启动 Kafka(KRaft 模式,无需 ZooKeeper)进入 Kafka 解压目录,执行:
# 生成集群 ID .\bin\windows\kafka-storage.bat random-uuid # 格式化存储目录(使用上面生成的 UUID) .\bin\windows\kafka-storage.bat format -t <UUID> -c .\config\kraft\server.properties # 启动 Kafka .\bin\windows\kafka-server-start.bat .\config\kraft\server.properties - 验证:新建命令行窗口,进入 Kafka 目录,执行:
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
不报错即成功。
生产环境提醒:Windows 生产环境不推荐,请使用 Linux 或 Docker/K8s。
四、快速上手(3 个示例,10 分钟跑通)
前提:本地已启动 Kafka(参考上面的 Docker 命令)
示例1:最小可用生产者消费者
生产者(发送消息)
using Confluent.Kafka; var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using var producer = new ProducerBuilder<Null, string>(config).Build(); for (int i = 0; i < 10; i++) { var result = await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"Message {i}" }); Console.WriteLine($"Sent: {result.Value}"); }消费者(接收消息)
示例2:ASP.NET Core 中集成生产者(带 DI)
// Program.cs 注册(见第二章) builder.Services.AddSingleton<IProducer<Null, string>>(provider => { ... }); // 业务服务中使用 public class OrderService { private readonly IProducer<Null, string> _producer; public OrderService(IProducer<Null, string> producer) { _producer = producer; } public async Task CreateOrderAsync(OrderDto order) { var json = JsonSerializer.Serialize(order); await _producer.ProduceAsync("orders", new Message<Null, string> { Value = json }); } }示例3:后台消费者服务(IHostedService + 重试+死信处理)
public class KafkaConsumerService : BackgroundService { private readonly ILogger<KafkaConsumerService> _logger; private IConsumer<Null, string> _consumer; private readonly int _maxRetries = 3; public KafkaConsumerService(ILogger<KafkaConsumerService> logger) { _logger = logger; } public override Task StartAsync(CancellationToken cancellationToken) { var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "order-group", AutoOffsetReset = AutoOffsetReset.Latest, EnableAutoCommit = false // 手动提交,精确控制 }; _consumer = new ConsumerBuilder<Null, string>(config).Build(); _consumer.Subscribe("orders"); return base.StartAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { // Consume 是同步阻塞调用;BackgroundService 的 ExecuteAsync // 运行在独立线程,此处阻塞是预期行为,不会影响主线程 var result = _consumer.Consume(stoppingToken); bool success = await ProcessWithRetry(result.Message.Value, stoppingToken); if (success) { _consumer.Commit(result); _logger.LogInformation("Processed and committed: {Value}", result.Message.Value); } else { // 超过重试次数:记录死信,手动提交跳过该消息(或发送到死信 topic) _logger.LogError("Failed after retries, skipping: {Value}", result.Message.Value); _consumer.Commit(result); } } catch (OperationCanceledException) { // 应用正常关闭,停止消费循环,不记录错误 break; } catch (ConsumeException ex) { _logger.LogError(ex, "Consume error, will retry after delay"); await Task.Delay(1000, stoppingToken); } } } private async Task<bool> ProcessWithRetry(string message, CancellationToken token) { for (int i = 0; i < _maxRetries; i++) { try { // 模拟业务处理 await Task.Delay(10, token); return true; } catch (Exception ex) { _logger.LogWarning(ex, "Retry {Retry} for message", i + 1); await Task.Delay(100 * (i + 1), token); } } return false; } public override void Dispose() { _consumer?.Close(); _consumer?.Dispose(); base.Dispose(); } // 在 Program.cs 中注册 builder.Services.AddHostedService<KafkaConsumerService>(); }五、什么时候该用 Kafka?
✅ 适合用的场景
- 日志 / 埋点 / 审计:每天几 TB 数据,需要存 7 天以上,随时查。
- 微服务事件驱动:订单创建后需要通知库存、支付、物流等,解耦且不怕下游故障。
- 需要重放历史消息:比如修复 Bug 后,想重新处理昨天的一批消息。
- 数据同步(CDC):数据库变化同步到缓存或数仓。
❌ 不适合用的场景
- 低延迟要求(< 10ms):Kafka 正常生产环境的端到端延迟在 2–15ms 之间,调优后可更低,但仍高于 Redis 或纯内存队列。
- 复杂消息路由:比如根据不同头部分发到不同队列,RabbitMQ 更擅长。
- 小项目 / 小团队:日均消息不到 10 万,用 SQL Server 或 Redis 当队列更简单。
六、实战里最常见的四个坑(补充死信陷阱)
坑1:分区设太多导致 rebalance 卡顿
- 现象:消费者频繁断开重连,消费变慢甚至停止。
- 解法:普通场景 10–30 个分区就够了,不要一上来就设 200。
坑2:生产者没配置acks=all会丢消息
- 现象:Kafka 节点重启后,少量消息消失。
- 解法:生产者加一行配置
Acks = Acks.All。
坑3:消费者处理太慢,被踢出组
- 现象:日志出现
LeaveGroup,然后消费又重头开始。 - 解法:增加
max.poll.interval.ms或把长任务异步化。
坑4:关闭自动提交后未处理失败消息,导致无限重试(★ 高频生产事故)
- 现象:某条消息处理一直抛异常,消费者不断重试同一条消息,后续消息全部阻塞。
- 原因:
EnableAutoCommit = false且未 Commit,异常后不提交 offset,下次拉取仍是同一条。 - 解法:在示例3中实现了重试计数器 + 超过次数后手动 Commit 跳过(或发送到死信 Topic)。生产环境必须设计死信队列(DLQ)机制,不可无限重试。
七、选型结论(直接抄作业)
一句话决策:如果你的系统每天产生几百万条以上消息,或者需要随时重放历史消息,并且团队有人懂 Linux→ 上 Kafka。
否则,继续用 RabbitMQ 或数据库队列更省心。
| 日均消息量 | 需要历史回溯? | 运维能力 | 推荐 |
|---|---|---|---|
| < 10 万 | 否 | 一般 | RabbitMQ / Redis |
| < 10 万 | 是 | 一般 | 数据库存 JSON |
| 10 万 – 500 万 | 是 | 有 Linux 基础 | 自建 3 节点 Kafka |
| > 500 万 | 是 | 有专职运维 | 自建 Kafka 或云托管 |
八、总结
Kafka 没那么神秘——它就是一个能存能重放、特别能扛并发的消息队列。.NET 接入只需要装一个 NuGet 包、写几行代码。
如果你正被高并发压垮、或者为丢消息和无法重放而头疼,花半天用 Docker 跑个 Kafka 体验一下,大概率会回不去。
下一步行动:使用上面正确的 Docker 命令启动 Kafka,然后跑通生产者消费者示例,特别注意消费者示例中的死信处理和优雅停机逻辑。
评论区征集:你在生产环境遇到过“消息无限重试”导致积压吗?留言聊聊你的解决方案,我会回复。
如果觉得这篇修正版有用,点赞、在看、转发给更多 .NET 朋友。少踩坑,早下班。
