利用ASP.NET Core开发分布式事务与Saga模式实现插图

利用ASP.NET Core开发分布式事务与Saga模式实现:从理论到实战的可靠之旅

你好,我是源码库的博主。今天我们来聊聊在微服务架构中一个既经典又棘手的问题:分布式事务。你是否曾为“订单服务”扣款成功,但“库存服务”锁定库存失败,导致数据不一致而头疼?传统的ACID事务在服务边界面前束手无策。经过多个项目的“踩坑”与“填坑”,我深刻地认识到,Saga模式是解决这类问题的一把利器。它不追求强一致性,而是通过最终一致性和补偿机制来保证系统的可靠与健壮。接下来,我将结合ASP.NET Core,带你一步步实现一个基于协调器(Orchestrator)模式的Saga。

一、理解核心:Saga模式与我们的选择

在开始敲代码前,我们必须搞清楚Saga是什么。简单说,一个Saga就是一个由多个本地事务组成的“长事务”。每个本地事务完成后,会发布一个事件或消息来触发下一个本地事务。如果其中任何一个步骤失败,Saga会启动一系列补偿操作(Compensating Transaction)来撤销之前所有成功步骤的影响,使系统回滚到一致状态。

Saga主要有两种实现方式:

  1. 编排(Choreography):服务之间通过事件直接通信,没有中心协调点。简单,但逻辑分散,难以理解和调试。
  2. 协调(Orchestration):引入一个中心化的“协调器”(Orchestrator)来负责指挥整个流程。逻辑集中,更易管理和监控,这也是我们本次实战的选择。

我们的场景模拟一个简化的“电商下单流程”:1. 创建订单(Order Service) -> 2. 扣减库存(Inventory Service) -> 3. 处理支付(Payment Service)。任何一步失败,都需要触发已成功步骤的补偿。

二、搭建战场:项目结构与基础模型

首先,我们创建一个ASP.NET Core的解决方案。我建议使用Clean Architecture的思想来组织,但为了直观,我们先建立三个核心服务项目和一个协调器项目:

dotnet new sln -n DistributedSagaDemo
dotnet new webapi -n OrderService
dotnet new webapi -n InventoryService
dotnet new webapi -n PaymentService
dotnet new webapi -n SagaOrchestrator
dotnet sln add OrderService InventoryService PaymentService SagaOrchestrator

每个服务都需要定义自己的领域模型和API端点。以订单服务为例,我们定义一个简单的订单模型:

// OrderService/Models/Order.cs
namespace OrderService.Models
{
    public class Order
    {
        public Guid OrderId { get; set; }
        public Guid UserId { get; set; }
        public Guid ProductId { get; set; }
        public int Quantity { get; set; }
        public decimal Amount { get; set; }
        public OrderStatus Status { get; set; } // 如:Pending, Created, Cancelled, Completed
    }
    public enum OrderStatus { Pending, Created, Cancelled, Completed }
}

踩坑提示:每个服务的数据库必须是独立的,这是微服务的基本规则。不要试图共享数据库,否则就退回到单体架构了。

三、建立通信:集成事件与消息队列

服务间如何通信?我们使用消息队列(如RabbitMQ或Azure Service Bus)来传递集成事件。这里为了简化,我们使用内存中的MassTransit(一个优秀的.NET消息总线库)来模拟。首先,在所有项目中安装NuGet包:

dotnet add package MassTransit
dotnet add package MassTransit.AspNetCore
dotnet add package MassTransit.RabbitMQ # 如果使用真正的RabbitMQ

然后,定义我们的事件契约。这些契约应该放在一个共享的类库中,但为了演示,我们暂时在协调器里定义,然后各服务引用(生产环境建议用NuGet包共享)。

// SagaOrchestrator/Events/OrderCreatedEvent.cs
public interface OrderCreatedEvent
{
    Guid OrderId { get; }
    Guid UserId { get; }
    Guid ProductId { get; }
    int Quantity { get; }
    decimal Amount { get; }
}

// SagaOrchestrator/Events/InventoryReservedEvent.cs
public interface InventoryReservedEvent
{
    Guid OrderId { get; }
    Guid ProductId { get; }
    int Quantity { get; }
}

// 同理定义 PaymentProcessedEvent, OrderFailedEvent, CompensateInventoryEvent 等

Program.cs中配置MassTransit:

// 以OrderService为例
builder.Services.AddMassTransit(x =>
{
    x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
    // 如果是RabbitMQ: x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost"); cfg.ConfigureEndpoints(context); });
});

四、核心实现:Saga协调器状态机

这是最核心的部分!我们将使用MassTransit的Automatonymous库来定义Saga状态机。在协调器项目中,首先定义Saga状态和数据:

// SagaOrchestrator/Saga/OrderSagaState.cs
public class OrderSagaState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; } // 关键!关联所有事件
    public string CurrentState { get; set; }
    public Guid OrderId { get; set; }
    public Guid UserId { get; set; }
    // ... 其他业务数据
}

// SagaOrchestrator/Saga/OrderSagaStateMachine.cs
public class OrderSagaStateMachine : MassTransitStateMachine
{
    // 1. 定义状态
    public State AwaitingInventory { get; private set; }
    public State AwaitingPayment { get; private set; }
    public State Completed { get; private set; }
    public State Compensating { get; private set; }

    // 2. 定义事件
    public Event OrderCreated { get; private set; }
    public Event InventoryReserved { get; private set; }
    public Event InventoryReservationFailed { get; private set; }
    public Event PaymentProcessed { get; private set; }
    public Event PaymentFailed { get; private set; }

    public OrderSagaStateMachine()
    {
        // 3. 定义初始状态和关联事件
        InstanceState(x => x.CurrentState);

        // 4. 编排流程(这是核心逻辑!)
        Initially(
            When(OrderCreated)
                .Then(context => {
                    // 可以在这里记录日志或更新Saga数据
                    context.Instance.OrderId = context.Data.OrderId;
                })
                .PublishAsync(context => context.Init(new {
                    OrderId = context.Data.OrderId,
                    ProductId = context.Data.ProductId,
                    Quantity = context.Data.Quantity
                })) // 发布“扣减库存”命令
                .TransitionTo(AwaitingInventory)
        );

        During(AwaitingInventory,
            When(InventoryReserved)
                .PublishAsync(context => context.Init(new {
                    OrderId = context.Data.OrderId,
                    Amount = /* 从Saga State中获取 */,
                    UserId = context.Instance.UserId
                }))
                .TransitionTo(AwaitingPayment),

            When(InventoryReservationFailed)
                .Then(context => Log.Error($"库存预留失败 for Order {context.Data.OrderId}"))
                .PublishAsync(context => context.Init(new {
                    OrderId = context.Data.OrderId
                })) // 发布补偿订单的命令
                .TransitionTo(Compensating)
                .Finalize()
        );

        During(AwaitingPayment,
            When(PaymentProcessed)
                .Then(context => Log.Information($"订单 {context.Data.OrderId} 流程完成!"))
                .TransitionTo(Completed)
                .Finalize(),

            When(PaymentFailed)
                .PublishAsync(context => context.Init(new {
                    OrderId = context.Data.OrderId,
                    ProductId = context.Instance.ProductId,
                    Quantity = context.Instance.Quantity
                })) // 支付失败,发布“补偿库存”命令
                .PublishAsync(context => context.Init(new {
                    OrderId = context.Data.OrderId
                }))
                .TransitionTo(Compensating)
                .Finalize()
        );

        // 5. 配置Saga存储(这里用内存,生产环境用MongoDB、Redis等)
        SetCompletedWhenFinalized();
    }
}

实战经验:状态机的设计要清晰定义所有可能的状态转换和补偿路径。务必为每个失败场景都设计补偿事件,这是保证最终一致性的关键。

五、收尾与测试:启动流程与观察结果

最后,我们需要一个起点。在OrderService中,创建一个控制器,当用户下单时,发布OrderCreatedEvent事件,从而触发整个Saga。

// OrderService/Controllers/OrderController.cs
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly IPublishEndpoint _publishEndpoint;
    public OrderController(IPublishEndpoint publishEndpoint) => _publishEndpoint = publishEndpoint;

    [HttpPost]
    public async Task CreateOrder([FromBody] CreateOrderRequest request)
    {
        var orderId = Guid.NewGuid();
        // 1. 本地事务:在订单数据库创建状态为Pending的订单
        // _orderRepository.Add(new Order { OrderId = orderId, Status = OrderStatus.Pending, ... });

        // 2. 发布集成事件,启动Saga
        await _publishEndpoint.Publish(new
        {
            OrderId = orderId,
            UserId = request.UserId,
            ProductId = request.ProductId,
            Quantity = request.Quantity,
            Amount = request.Quantity * 100m // 模拟计算金额
        });

        return Accepted(new { OrderId = orderId, Message = "订单正在处理中..." });
    }
}

现在,启动所有服务,使用Postman调用POST /order。你可以通过以下方式测试:

  1. 正常流程:所有服务都正常,最终订单状态变为Completed。
  2. 库存服务失败:模拟InventoryService在收到命令后抛出异常,你会看到协调器收到InventoryReservationFailed事件,并触发对订单的补偿(将订单状态改为Cancelled)。
  3. 支付服务失败:库存扣减成功,但支付失败。协调器会先后触发对库存的补偿(恢复库存)和对订单的补偿。

踩坑提示:补偿操作必须是幂等的!因为网络问题,补偿命令可能被重复发送。你的补偿处理逻辑需要判断是否已经补偿过,避免重复执行导致数据错误。

总结与展望

通过以上步骤,我们成功在ASP.NET Core中实现了一个基于协调器模式的Saga。它有效地解决了跨服务的业务数据一致性问题。当然,这是一个简化版。在生产环境中,你还需要考虑:

  • 持久化:将Saga状态机实例持久化到可靠的存储(如MongoDB, SQL Server)。
  • 监控与告警:对长时间处于中间状态(如AwaitingPayment)的Saga进行监控和人工干预。
  • 可观测性:为每个Saga实例关联唯一的CorrelationId,并贯穿日志、跟踪(如OpenTelemetry),方便问题排查。

分布式事务没有银弹,Saga模式以它的灵活性和适用性成为了微服务架构下的主流选择。希望这篇实战教程能帮助你理解其精髓,并在你的项目中成功落地。如果在实践中遇到问题,欢迎在源码库社区交流讨论!

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。