
利用ASP.NET Core开发分布式事务与Saga模式实现:从理论到实战的可靠之旅
你好,我是源码库的博主。今天我们来聊聊在微服务架构中一个既经典又棘手的问题:分布式事务。你是否曾为“订单服务”扣款成功,但“库存服务”锁定库存失败,导致数据不一致而头疼?传统的ACID事务在服务边界面前束手无策。经过多个项目的“踩坑”与“填坑”,我深刻地认识到,Saga模式是解决这类问题的一把利器。它不追求强一致性,而是通过最终一致性和补偿机制来保证系统的可靠与健壮。接下来,我将结合ASP.NET Core,带你一步步实现一个基于协调器(Orchestrator)模式的Saga。
一、理解核心:Saga模式与我们的选择
在开始敲代码前,我们必须搞清楚Saga是什么。简单说,一个Saga就是一个由多个本地事务组成的“长事务”。每个本地事务完成后,会发布一个事件或消息来触发下一个本地事务。如果其中任何一个步骤失败,Saga会启动一系列补偿操作(Compensating Transaction)来撤销之前所有成功步骤的影响,使系统回滚到一致状态。
Saga主要有两种实现方式:
- 编排(Choreography):服务之间通过事件直接通信,没有中心协调点。简单,但逻辑分散,难以理解和调试。
- 协调(Orchestration):引入一个中心化的“协调器”(Orchestrator)来负责指挥整个流程。逻辑集中,更易管理和监控,这也是我们本次实战的选择。
我们的场景模拟一个简化的“电商下单流程”:1. 创建订单(Order Service) -> 2. 扣减库存(Inventory Service) -> 3. 处理支付(Payment Service)。任何一步失败,都需要触发已成功步骤的补偿。
二、搭建战场:项目结构与基础模型
首先,我们创建一个ASP.NET Core的解决方案。我建议使用Clean Architecture的思想来组织,但为了直观,我们先建立三个核心服务项目和一个协调器项目:
dotnet new sln -n DistributedSagaDemo
dotnet new webapi -n OrderService
dotnet new webapi -n InventoryService
dotnet new webapi -n PaymentService
dotnet new webapi -n SagaOrchestrator
dotnet sln add OrderService InventoryService PaymentService SagaOrchestrator
每个服务都需要定义自己的领域模型和API端点。以订单服务为例,我们定义一个简单的订单模型:
// OrderService/Models/Order.cs
namespace OrderService.Models
{
public class Order
{
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
public Guid ProductId { get; set; }
public int Quantity { get; set; }
public decimal Amount { get; set; }
public OrderStatus Status { get; set; } // 如:Pending, Created, Cancelled, Completed
}
public enum OrderStatus { Pending, Created, Cancelled, Completed }
}
踩坑提示:每个服务的数据库必须是独立的,这是微服务的基本规则。不要试图共享数据库,否则就退回到单体架构了。
三、建立通信:集成事件与消息队列
服务间如何通信?我们使用消息队列(如RabbitMQ或Azure Service Bus)来传递集成事件。这里为了简化,我们使用内存中的MassTransit(一个优秀的.NET消息总线库)来模拟。首先,在所有项目中安装NuGet包:
dotnet add package MassTransit
dotnet add package MassTransit.AspNetCore
dotnet add package MassTransit.RabbitMQ # 如果使用真正的RabbitMQ
然后,定义我们的事件契约。这些契约应该放在一个共享的类库中,但为了演示,我们暂时在协调器里定义,然后各服务引用(生产环境建议用NuGet包共享)。
// SagaOrchestrator/Events/OrderCreatedEvent.cs
public interface OrderCreatedEvent
{
Guid OrderId { get; }
Guid UserId { get; }
Guid ProductId { get; }
int Quantity { get; }
decimal Amount { get; }
}
// SagaOrchestrator/Events/InventoryReservedEvent.cs
public interface InventoryReservedEvent
{
Guid OrderId { get; }
Guid ProductId { get; }
int Quantity { get; }
}
// 同理定义 PaymentProcessedEvent, OrderFailedEvent, CompensateInventoryEvent 等
在Program.cs中配置MassTransit:
// 以OrderService为例
builder.Services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
// 如果是RabbitMQ: x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost"); cfg.ConfigureEndpoints(context); });
});
四、核心实现:Saga协调器状态机
这是最核心的部分!我们将使用MassTransit的Automatonymous库来定义Saga状态机。在协调器项目中,首先定义Saga状态和数据:
// SagaOrchestrator/Saga/OrderSagaState.cs
public class OrderSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; } // 关键!关联所有事件
public string CurrentState { get; set; }
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
// ... 其他业务数据
}
// SagaOrchestrator/Saga/OrderSagaStateMachine.cs
public class OrderSagaStateMachine : MassTransitStateMachine
{
// 1. 定义状态
public State AwaitingInventory { get; private set; }
public State AwaitingPayment { get; private set; }
public State Completed { get; private set; }
public State Compensating { get; private set; }
// 2. 定义事件
public Event OrderCreated { get; private set; }
public Event InventoryReserved { get; private set; }
public Event InventoryReservationFailed { get; private set; }
public Event PaymentProcessed { get; private set; }
public Event PaymentFailed { get; private set; }
public OrderSagaStateMachine()
{
// 3. 定义初始状态和关联事件
InstanceState(x => x.CurrentState);
// 4. 编排流程(这是核心逻辑!)
Initially(
When(OrderCreated)
.Then(context => {
// 可以在这里记录日志或更新Saga数据
context.Instance.OrderId = context.Data.OrderId;
})
.PublishAsync(context => context.Init(new {
OrderId = context.Data.OrderId,
ProductId = context.Data.ProductId,
Quantity = context.Data.Quantity
})) // 发布“扣减库存”命令
.TransitionTo(AwaitingInventory)
);
During(AwaitingInventory,
When(InventoryReserved)
.PublishAsync(context => context.Init(new {
OrderId = context.Data.OrderId,
Amount = /* 从Saga State中获取 */,
UserId = context.Instance.UserId
}))
.TransitionTo(AwaitingPayment),
When(InventoryReservationFailed)
.Then(context => Log.Error($"库存预留失败 for Order {context.Data.OrderId}"))
.PublishAsync(context => context.Init(new {
OrderId = context.Data.OrderId
})) // 发布补偿订单的命令
.TransitionTo(Compensating)
.Finalize()
);
During(AwaitingPayment,
When(PaymentProcessed)
.Then(context => Log.Information($"订单 {context.Data.OrderId} 流程完成!"))
.TransitionTo(Completed)
.Finalize(),
When(PaymentFailed)
.PublishAsync(context => context.Init(new {
OrderId = context.Data.OrderId,
ProductId = context.Instance.ProductId,
Quantity = context.Instance.Quantity
})) // 支付失败,发布“补偿库存”命令
.PublishAsync(context => context.Init(new {
OrderId = context.Data.OrderId
}))
.TransitionTo(Compensating)
.Finalize()
);
// 5. 配置Saga存储(这里用内存,生产环境用MongoDB、Redis等)
SetCompletedWhenFinalized();
}
}
实战经验:状态机的设计要清晰定义所有可能的状态转换和补偿路径。务必为每个失败场景都设计补偿事件,这是保证最终一致性的关键。
五、收尾与测试:启动流程与观察结果
最后,我们需要一个起点。在OrderService中,创建一个控制器,当用户下单时,发布OrderCreatedEvent事件,从而触发整个Saga。
// OrderService/Controllers/OrderController.cs
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IPublishEndpoint _publishEndpoint;
public OrderController(IPublishEndpoint publishEndpoint) => _publishEndpoint = publishEndpoint;
[HttpPost]
public async Task CreateOrder([FromBody] CreateOrderRequest request)
{
var orderId = Guid.NewGuid();
// 1. 本地事务:在订单数据库创建状态为Pending的订单
// _orderRepository.Add(new Order { OrderId = orderId, Status = OrderStatus.Pending, ... });
// 2. 发布集成事件,启动Saga
await _publishEndpoint.Publish(new
{
OrderId = orderId,
UserId = request.UserId,
ProductId = request.ProductId,
Quantity = request.Quantity,
Amount = request.Quantity * 100m // 模拟计算金额
});
return Accepted(new { OrderId = orderId, Message = "订单正在处理中..." });
}
}
现在,启动所有服务,使用Postman调用POST /order。你可以通过以下方式测试:
- 正常流程:所有服务都正常,最终订单状态变为Completed。
- 库存服务失败:模拟InventoryService在收到命令后抛出异常,你会看到协调器收到
InventoryReservationFailed事件,并触发对订单的补偿(将订单状态改为Cancelled)。 - 支付服务失败:库存扣减成功,但支付失败。协调器会先后触发对库存的补偿(恢复库存)和对订单的补偿。
踩坑提示:补偿操作必须是幂等的!因为网络问题,补偿命令可能被重复发送。你的补偿处理逻辑需要判断是否已经补偿过,避免重复执行导致数据错误。
总结与展望
通过以上步骤,我们成功在ASP.NET Core中实现了一个基于协调器模式的Saga。它有效地解决了跨服务的业务数据一致性问题。当然,这是一个简化版。在生产环境中,你还需要考虑:
- 持久化:将Saga状态机实例持久化到可靠的存储(如MongoDB, SQL Server)。
- 监控与告警:对长时间处于中间状态(如AwaitingPayment)的Saga进行监控和人工干预。
- 可观测性:为每个Saga实例关联唯一的CorrelationId,并贯穿日志、跟踪(如OpenTelemetry),方便问题排查。
分布式事务没有银弹,Saga模式以它的灵活性和适用性成为了微服务架构下的主流选择。希望这篇实战教程能帮助你理解其精髓,并在你的项目中成功落地。如果在实践中遇到问题,欢迎在源码库社区交流讨论!

评论(0)