流批一体架构实战:从Spark、Pulsar到状态管理的实时数据处理系统设计
1. 项目概述:从“derun”看现代数据驱动型应用的架构演进
最近在梳理团队的技术资产时,一个内部代号为“derun”的项目引起了我的注意。这并非一个开源框架或某个知名产品的名称,而是一个典型的、由业务需求驱动的内部项目代号。它代表了我们团队在过去两年里,为应对复杂业务场景下实时数据处理与分析需求而构建的一套核心数据服务引擎。今天,我想抛开具体的公司业务细节,从一个更普适的架构师视角,来拆解“derun”这类数据驱动型应用背后的设计思路、核心技术选型以及那些在官方文档里不会写的“踩坑”实录。
简单来说,“derun”要解决的核心问题是:如何高效、稳定、可扩展地处理来自多个异构数据源的实时流数据,经过一系列清洗、转换、聚合与计算后,将结果低延迟地提供给下游的决策系统、用户画像或实时报表。这听起来像是经典的大数据流水线,但“derun”的特殊之处在于,它需要在资源相对受限的云原生环境中,平衡吞吐量、延迟、准确性与开发运维成本。如果你正在或即将面临类似的挑战——无论是构建一个实时风控系统、一个用户行为分析平台,还是一个物联网数据中枢——那么“derun”演进过程中的经验与教训,或许能给你带来一些直接的参考。
2. 核心架构设计与技术选型背后的逻辑
2.1 为什么是流批一体的架构?
在项目初期,我们面临一个经典抉择:采用独立的流处理(如Flink)和批处理(如Spark)两套系统,还是拥抱流批一体的设计。我们最终选择了后者,核心驱动力来自业务对数据一致性的严苛要求。
想象一下这样的场景:一个实时仪表盘展示今日累计销售额。如果实时流处理计算一次,凌晨的批处理任务又全量校准一次,很容易出现“今日数据在两个时间段看结果不一致”的尴尬局面,这会给业务决策带来严重干扰。“derun”的目标是提供“唯一可信的数据视图”。流批一体架构允许我们使用同一套API和计算逻辑处理实时数据和历史数据,确保无论数据何时被处理、以何种速度到达,只要经过相同的计算逻辑,结果就是一致的。
技术选型上,我们评估了Apache Flink和Apache Spark Structured Streaming。Flink在真正的流处理(尤其是事件时间处理和状态管理)上优势明显,但其早期的批处理性能和对生态的整合一度让我们犹豫。而Spark Structured Streaming本质上是微批处理,在吞吐量优先且允许秒级延迟的场景下表现更稳定,其与Spark SQL、DataFrame API的无缝集成也降低了开发门槛。考虑到团队已有的Spark技术栈和初期业务对“亚秒级”实时性并非硬需求,我们选择了Spark Structured Streaming作为计算引擎。这个选择并非永恒,它基于当时的团队能力、业务容忍度和运维成本综合决定。
注意:流批一体不是银弹。它引入了额外的复杂性,比如需要精心设计状态后端(State Backend)的存储与容错,对水源(Source)和目的端(Sink)的兼容性要求也更高。如果你的业务场景中,实时与离线报表天然分离且允许最终一致性,那么维护两套更简单的系统可能更经济。
2.2 消息队列选型:Kafka还是Pulsar?
数据入口是整个系统的咽喉。“derun”的数据源包括前端埋点日志、数据库变更日志(CDC)和第三方API推送。我们需要一个高吞吐、可持久化、支持多消费者的消息队列。经典的Kafka和新兴的Apache Pulsar是主要候选。
Kafka的社区成熟度、生态工具丰富性毋庸置疑,是大多数团队的首选。但我们在预研阶段遇到了两个具体问题:一是Kafka在云原生环境下的弹性伸缩相对繁琐,虽然KRaft模式在改进,但当时尚不成熟;二是对于需要复杂订阅模式(如按标签订阅、重播特定时间点消息)的场景,Kafka需要额外的抽象层。Pulsar的分层架构(计算与存储分离)使其在Kubernetes上的扩缩容非常灵活,其内置的多租户、地理复制和灵活的订阅模型(独占、故障转移、共享、Key_Shared)更贴近我们未来规划的多团队数据共享场景。
然而,选择Pulsar意味着要承受相对较小的社区和更陡峭的学习曲线。我们最终选择了Pulsar,一个重要原因是团队有足够的运维能力去把控,并且看中了其架构上的长期优势。对于大多数团队,如果不需要Pulsar的特定功能,Kafka依然是风险更低、更稳妥的选择。
2.3 计算状态管理与存储方案
流处理中的有状态计算(如窗口聚合、会话分析)是核心也是难点。“derun”中有大量“计算过去一小时每个用户的访问频次”这类需求。Spark Structured Streaming使用检查点(Checkpoint)目录和预写日志(WAL)来保障状态容错。但状态数据本身的管理和查询是个问题。
我们放弃了将状态简单写入检查点目录的做法,而是将状态显式地存储到一个外部的、可查询的键值存储中。我们对比了Redis和RocksDB(通过Spark的mapGroupsWithStateAPI可集成)。Redis性能极高,但全内存存储成本高,且在海量状态数据下持久化是个挑战。RocksDB作为嵌入式KV存储,状态跟随应用进程,读写速度快,且状态数据最终会随检查点持久化到对象存储(如S3),成本低廉。
我们选择了RocksDB。但这带来了新的运维复杂性:需要监控RocksDB的内存表(MemTable)和SST文件大小,防止状态膨胀导致OOM。我们为此开发了定制的监控指标,当单个任务的状态大小超过阈值时自动告警并触发状态清理或任务重启。
3. 核心模块拆解与实现细节
3.1 数据摄入层:统一抽象与容错设计
摄入层是系统稳定性的第一道防线。我们设计了一个统一的DataIngestor接口,背后对接Kafka、Pulsar、数据库CDC(Debezium)、文件流等不同水源。接口的核心方法是subscribe(processFunction),内部封装了连接管理、反序列化、基础监控和死信队列(DLQ)处理。
以Pulsar水源为例,关键的配置和代码片段如下:
// 关键配置参数 val pulsarConfig = Map( "serviceUrl" -> "pulsar://localhost:6650", "topic" -> "persistent://tenant/namespace/topic-name", "subscriptionName" -> "derun-spark-consumer", "subscriptionType" -> "Shared", // 根据场景选择 "failOnDataLoss" -> "false", // 谨慎设置为true,在topic扩容时可能导致失败 "maxFailures" -> "3" // 最大失败重试次数 ) // 结构化流读取 val inputStream = sparkSession .readStream .format("pulsar") .options(pulsarConfig) .load()容错设计的核心在于死信队列(DLQ)。任何因数据格式错误、反序列化失败、或处理逻辑异常导致单条记录处理失败的情况,我们都不会让整个作业失败,而是将原始错误消息、异常堆栈和上下文信息JSON序列化后,写入一个独立的Pulsar死信主题。这样既保证了主数据流的高可用,又为事后排查和数据修复提供了可能。我们有一个独立的补偿作业定期消费DLQ,进行人工或自动修复。
3.2 处理引擎层:Spark Structured Streaming 最佳实践
在Spark作业开发中,我们严格遵守了几个原则以确保性能和稳定性:
1. 微批处理间隔的权衡:trigger(ProcessingTime="10 seconds")。这个间隔并非越小越好。更短的间隔(如1秒)意味着更低的延迟,但也会给调度系统带来更大压力,每个批次的开销(序列化、提交偏移量)占比会变高。经过压测,在保证业务延迟要求(1分钟内)的前提下,10秒间隔能在吞吐和延迟间取得较好平衡。对于真正需要毫秒级延迟的场景,我们会考虑使用Flink。
2. 状态操作的优化:对于groupByKey+mapGroupsWithState这类有状态操作,键值(Key)的选择至关重要。键的基数(Cardinality)不能过大,否则状态数爆炸;也不能过小,否则并行度不够。我们通常使用“用户ID+日期”的组合键,并通过预聚合(在mapGroupsWithState前先做一次微批内的聚合)来减少状态更新频率。
3. 水印与延迟数据的处理:对于事件时间窗口,必须合理设置水印(Watermark)。withWatermark("eventTime", "2 minutes")表示系统允许数据延迟2分钟到达。超过水印的数据会被丢弃(默认)或通过outputMode("append")中的withWatermark参数控制。我们通过监控“延迟记录数”指标来评估水印设置的合理性,如果发现大量数据被丢弃,需要调大水印值或排查数据源延迟原因。
3.3 输出与服务层:多路复用与一致性保证
处理后的数据需要写入多个目的地:实时OLAP数据库(如ClickHouse)、缓存(如Redis)供在线API查询,以及对象存储(如S3)做长期备份。我们采用了“一写多读”的架构,即流处理作业只写入一个核心事实表(在ClickHouse中),然后通过物化视图、触发器或Changefeeds机制,将数据同步到其他系统。
这样做的好处是保证了数据出口的一致性,所有下游系统都消费同一份经过处理的数据。缺点是增加了对核心事实表的依赖。我们为ClickHouse表设计了合理的分区键(通常是日期)和排序键(通常是用户ID和事件时间),并利用其ReplacingMergeTree引擎处理同一键值的更新。
对于实时API服务,我们封装了一个轻量的查询层。它并非直接查询流处理引擎,而是查询已经持久化到ClickHouse或Redis中的数据。查询层内置了查询路由、降级策略和缓存机制。例如,查询“当前在线人数”这种极热数据,会直接读Redis;查询“过去24小时用户行为路径”这种复杂聚合,则路由到ClickHouse。
4. 运维监控与故障排查实战录
4.1 监控指标体系构建
一个健壮的系统离不开可观测性。我们为“derun”建立了四级监控指标:
- 基础设施层:Pulsar集群的吞吐量、积压(Backlog)、 broker负载;Spark Driver/Executor的CPU、内存、GC情况;ClickHouse的查询QPS、慢查询、磁盘使用率。
- 作业层:每个Spark Streaming作业的批处理时间、调度延迟、输入速率、处理速率、状态存储大小。关键是通过
StreamingQueryListener接口自定义监听器,将指标推送到Prometheus。 - 数据质量层:记录数波动率(同比/环比)、空值率、关键字段枚举值分布、端到端延迟(从数据产生到可查询)。这部分通过一个独立的审计作业周期性计算。
- 业务价值层:最终产出数据的消费方调用成功率、查询延迟、数据准确性(通过抽样与离线数仓对比)。
我们使用Grafana绘制仪表盘,将关联指标放在一起。例如,一个面板同时展示“Pulsar消息积压”和“Spark作业处理延迟”,当两者同时飙升时,可以快速定位是消费端计算能力不足。
4.2 典型故障场景与排查手册
以下是我们遇到并解决过的几个典型问题,整理成了排查清单:
| 故障现象 | 可能原因 | 排查步骤 | 解决方案 |
|---|---|---|---|
| Spark作业处理延迟持续增高,但CPU/内存使用率正常 | 1. 数据倾斜 2. 状态操作过载 3. 输出Sink阻塞 | 1. 查看Spark UI各Stage任务耗时,检查是否有少数Task执行极慢。 2. 检查 mapGroupsWithState相关指标,观察状态读写延迟。3. 检查ClickHouse或Redis的写入延迟监控。 | 1. 数据倾斜:在groupBy前加盐(Salt)或使用两阶段聚合。2. 状态过载:优化状态键设计,增加分区数。 3. Sink阻塞:增加Sink并行度,检查目标库负载。 |
| Pulsar消息积压快速上涨,Spark作业输入速率下降 | 1. Spark作业消费能力不足 2. Pulsar Broker故障或网络分区 3. 反序列化异常导致消费卡住 | 1. 检查Spark Executor数量、CPU使用率。 2. 检查Pulsar Broker健康状态和网络连接。 3. 查看作业日志中是否有持续的序列化错误,检查死信队列堆积情况。 | 1. 动态调整Spark Executor资源。 2. 重启受影响Broker或检查网络。 3. 修复数据格式,或更新反序列化逻辑以兼容脏数据。 |
| 查询层API返回的数据与预期不符,存在延迟或缺失 | 1. 流处理作业本身延迟或中断 2. ClickHouse物化视图未更新 3. 缓存(Redis)数据未刷新或过期 | 1. 检查流作业最新批次提交时间和水印。 2. 检查ClickHouse物化视图的 is_timely状态。3. 检查Redis键的TTL和更新时间戳。 | 1. 重启流作业或排查上游延迟。 2. 手动触发物化视图刷新或优化其定义。 3. 调整缓存策略,或设置更短的缓存过期时间。 |
4.3 资源优化与成本控制心得
在云上运行这样一套系统,成本是必须考虑的因素。我们通过以下方式优化:
1. 计算资源弹性伸缩:我们利用Kubernetes的HPA(水平Pod自动伸缩)为Spark Executor Pod配置了基于CPU利用率的自动伸缩策略。但流处理作业有状态,缩容可能导致状态重新分布,引发性能波动。因此,我们设定了较长的稳定窗口和较小的伸缩步长,避免频繁震荡。对于无状态的ETL环节,则可以更激进地伸缩。
2. 存储分层与生命周期管理:所有检查点数据和状态备份都写入S3标准存储层。我们为S3配置了生命周期策略,7天后的数据自动转入低频访问层,30天后转入归档层。ClickHouse中的原始明细数据保留30天,聚合后的数据保留一年。通过精细化的TTL管理,存储成本下降了60%以上。
3. 作业调度优化:并非所有处理都需要实时。我们将业务分为“热路径”(延迟<1分钟)和“温路径”(延迟<15分钟)。对于温路径作业,我们使用更大的批处理间隔(如5分钟),并在夜间业务低峰期调度更耗资源的回溯计算或数据校准任务。
5. 演进反思与未来展望
回顾“derun”的构建过程,有几个决策点值得重新思考。首先,在项目中期曾为了追求架构“纯净度”,试图用同一套代码同时服务实时和离线场景,导致代码复杂度急剧上升,测试困难。后来我们接受了“代码相似但独立”的哲学,实时和离线作业各自维护独立的代码分支和配置,通过共享核心算法库来保证逻辑一致,运维和开发的清晰度大大提升。
其次,关于技术债。早期为了快速上线,我们绕过了Pulsar和Spark的一些已知小bug,使用了临时性的workaround。这些“补丁”在系统稳定运行后很容易被遗忘,直到版本升级时集中爆发。现在我们建立了“技术债看板”,任何临时解决方案都必须附带Jira工单和解决时限,定期回顾清理。
最后,给打算构建类似系统的团队一个最朴实的建议:在早期,不要过度设计。先从最核心、最简单的流水线跑通开始,用Cron调度批处理也许就能满足初期的“准实时”需求。当业务真正感受到延迟的痛点和实时数据的价值时,再引入流处理引擎。同时,监控和告警一定要与核心业务逻辑同步建设,甚至先行。看不见的系统,比有问题的系统更可怕。
“derun”项目本身还在演进,下一步我们正在探索将部分计算逻辑从Spark SQL下推到Pulsar的Pulsar Functions中进行边缘预处理,以进一步降低端到端延迟。同时,也在评估数据湖格式(如Iceberg)作为流批统一存储层的可能性,以解决数据孤岛和历史数据回溯的便利性问题。架构之路没有终点,唯有持续演进,以应对不断变化的业务挑战。
