分布式事务与一致性保障:从 2PC 到 Saga 的工程实践
分布式事务与一致性保障:从 2PC 到 Saga 的工程实践
一、微服务拆分后的事务困境:跨服务数据一致性的挑战
单体应用中,一个业务操作通常在一个数据库事务中完成。但微服务拆分后,一个业务操作可能涉及多个服务的数据库写入。例如,电商下单操作需要同时:扣减库存服务中的库存、创建订单服务中的订单、增加积分服务中的积分。这三个操作分布在三个独立的数据库中,无法使用本地事务保证原子性。
如果库存扣减成功但订单创建失败,库存数据就会出现不一致。传统的两阶段提交(2PC)理论上可以解决这个问题,但其在微服务环境中的实际表现极差:同步阻塞导致吞吐量骤降,协调者单点故障风险高,超时机制在不可靠网络中频繁触发回滚。Saga 模式是微服务场景下更实用的方案——它放弃强一致性,通过补偿操作实现最终一致性。
二、分布式事务模式的演进与 Saga 编排机制
分布式事务模式从强一致性到最终一致性经历了三个阶段的演进:2PC(强一致,高阻塞)→ TCC(Try-Confirm-Cancel,业务层一致性)→ Saga(长事务,补偿回滚)。Saga 模式的核心思想是:将长事务拆分为多个本地事务,每个本地事务提交后通过事件触发下一个事务;如果某个步骤失败,则逆序执行已完成步骤的补偿操作。
sequenceDiagram participant O as 订单服务 participant I as 库存服务 participant P as 积分服务 participant C as Saga 协调器 C->>O: 开始 Saga 事务 O->>O: 创建订单(本地事务) O-->>C: 订单创建成功 C->>I: 扣减库存 I->>I: 扣减库存(本地事务) I-->>C: 库存扣减成功 C->>P: 增加积分 P->>P: 增加积分(本地事务) P--xP: 积分增加失败 P-->>C: 失败 Note over C: 触发补偿流程 C->>I: 补偿:恢复库存 I->>I: 恢复库存(本地事务) I-->>C: 库存已恢复 C->>O: 补偿:取消订单 O->>O: 取消订单(本地事务) O-->>C: 订单已取消 C-->>C: Saga 事务回滚完成上图展示了 Saga 事务的正常执行和补偿回滚流程。关键设计点在于:每个正向操作都必须有对应的补偿操作,补偿操作必须是幂等的(可能被执行多次),且补偿操作本身不能失败(否则需要人工介入)。
三、生产级实现:Saga 协调器
// SagaOrchestrator.java — Saga 事务协调器 import java.util.*; import java.util.concurrent.*; // Saga 步骤定义 record SagaStep( String name, Callable<Object> action, // 正向操作 Consumer<Object> compensation, // 补偿操作 int maxRetries, long timeoutMs ) {} // Saga 执行结果 record SagaResult( boolean success, String failedStep, String errorMessage, List<String> completedCompensations ) {} // Saga 协调器:编排多步骤事务的执行与补偿 // 设计意图:集中管理 Saga 事务的生命周期, // 支持超时控制、重试策略和补偿回滚 class SagaOrchestrator { private final List<SagaStep> steps = new ArrayList<>(); private final ExecutorService executor = Executors.newCachedThreadPool(); SagaOrchestrator addStep(String name, Callable<Object> action, Consumer<Object> compensation) { steps.add(new SagaStep(name, action, compensation, 3, 5000)); return this; } SagaResult execute() { // 记录已完成的步骤及其结果,用于补偿回滚 Deque<Map.Entry<SagaStep, Object>> completedSteps = new ArrayDeque<>(); Object lastResult = null; // 正向执行所有步骤 for (SagaStep step : steps) { try { // 带超时和重试的执行 Object result = executeWithRetry(step, lastResult); completedSteps.push(Map.entry(step, result)); lastResult = result; } catch (Exception e) { // 步骤执行失败,触发补偿回滚 List<String> compensations = compensate(completedSteps); return new SagaResult(false, step.name(), e.getMessage(), compensations); } } return new SagaResult(true, null, null, Collections.emptyList()); } // 带重试的步骤执行 // 设计意图:网络抖动等瞬时故障不应立即触发补偿, // 重试机制可以提高事务成功率 private Object executeWithRetry(SagaStep step, Object previousResult) throws Exception { Exception lastException = null; for (int attempt = 0; attempt <= step.maxRetries(); attempt++) { try { Future<Object> future = executor.submit(() -> step.action().call()); return future.get(step.timeoutMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { lastException = new Exception("步骤超时: " + step.name()); } catch (Exception e) { lastException = e; } if (attempt < step.maxRetries()) { Thread.sleep(1000 * (attempt + 1)); // 指数退避 } } throw lastException; } // 补偿回滚:逆序执行已完成步骤的补偿操作 // 设计意图:补偿操作必须幂等,因为可能被执行多次。 // 补偿操作本身失败时记录日志,不阻断其他补偿 private List<String> compensate(Deque<Map.Entry<SagaStep, Object>> completedSteps) { List<String> compensations = new ArrayList<>(); while (!completedSteps.isEmpty()) { Map.Entry<SagaStep, Object> entry = completedSteps.pop(); SagaStep step = entry.getKey(); Object result = entry.getValue(); try { step.compensation().accept(result); compensations.add("补偿成功: " + step.name()); } catch (Exception e) { // 补偿失败:记录日志,需要人工介入 compensations.add("补偿失败: " + step.name() + " - " + e.getMessage()); } } return compensations; } } // 使用示例:电商下单 Saga class OrderSagaExample { void createOrder(OrderRequest request) { SagaOrchestrator saga = new SagaOrchestrator(); // 步骤 1:创建订单 saga.addStep( "create-order", () -> orderService.create(request), // 正向:创建订单 (result) -> orderService.cancel((Order) result) // 补偿:取消订单 ); // 步骤 2:扣减库存 saga.addStep( "deduct-inventory", () -> inventoryService.deduct(request.getSkuId(), request.getQuantity()), (result) -> inventoryService.restore(request.getSkuId(), request.getQuantity()) ); // 步骤 3:增加积分 saga.addStep( "add-points", () -> pointsService.add(request.getUserId(), request.getPointsAmount()), (result) -> pointsService.deduct(request.getUserId(), request.getPointsAmount()) ); SagaResult result = saga.execute(); if (!result.success()) { // 记录失败日志,触发告警 alertService.notify("Saga 事务失败: " + result.failedStep()); } } }四、边界分析与架构权衡
Saga 模式在微服务落地中存在几个关键 Trade-off:
最终一致性的时间窗口。Saga 放弃了强一致性,在正向执行和补偿回滚之间存在一个"不一致窗口"。例如,库存已扣减但订单尚未创建的几秒内,其他用户可能看到库存减少但无法下单。对于金融场景(如转账),这个不一致窗口是不可接受的,必须使用 2PC 或 TCC 保证强一致性。
补偿操作的设计复杂度。不是所有操作都有天然的补偿操作。例如,"发送邮件"的补偿操作是什么?无法"取消发送"。解决方案是将不可补偿的操作放在 Saga 的最后一步执行,确保前面的步骤都成功后才触发不可逆操作。
Saga 链路过长的问题。一个 Saga 事务包含 10+ 个步骤时,任何一个步骤失败都会触发大量补偿操作,增加系统复杂度和故障风险。建议将 Saga 控制在 5 个步骤以内,超过 5 个步骤的业务应考虑重新划分服务边界。
适用边界:Saga 最适合业务流程长、对实时一致性要求不高的场景(如订单处理、工作流审批)。对于短事务、强一致性要求的场景(如账户余额变更),2PC 或 TCC 更合适。
五、总结
Saga 模式将分布式事务从"强一致阻塞"推进到"最终一致非阻塞"。核心要点:将长事务拆分为多个本地事务,每个步骤有对应的补偿操作,失败时逆序执行补偿回滚。落地建议:第一,将不可补偿的操作放在 Saga 最后一步;第二,所有补偿操作必须幂等,防止重复执行导致数据错误;第三,Saga 步骤控制在 5 个以内,过长的链路应重新审视服务边界。关键原则:分布式事务没有银弹——选择 2PC 还是 Saga,取决于业务对一致性和可用性的取舍。
