分布式事务的最终一致性模式与消息表方案实现插图

分布式事务的最终一致性模式与消息表方案实现:从理论到实战的平滑落地

在微服务架构下,我们告别了单体应用的单数据库和简单事务。当一个业务操作需要跨多个服务、多个数据库时,传统数据库的ACID事务(特别是强一致性)就变得力不从心,甚至不可能实现。这时,“最终一致性”就成了我们必须拥抱的设计模式。今天,我想结合自己趟过的坑,和大家深入聊聊最终一致性的一种经典实现方案——消息表(本地消息表),并手把手带你实现一个核心流程。

一、为什么是最终一致性?强一致性之殇

记得我第一次设计一个“用户下单扣库存”的流程时,本能地想用分布式事务框架(如Seata的AT模式)来保证强一致性。在测试环境一切顺利,但上了预发布环境,随着调用链变长、网络抖动,问题就来了:一个非核心的“送积分”服务短暂不可用,竟然导致整个下单流程回滚,用户无法完成购买。这显然不合理。

核心矛盾在于:在分布式系统中,追求瞬间的强一致性,往往需要以牺牲可用性(A)和分区容忍性(P)为代价(CAP定理)。对于很多业务场景,比如下单后发短信、支付成功送优惠券,我们其实允许系统在短时间内(几秒甚至几分钟)处于不一致状态,只要最终所有数据都是正确的即可。这就是最终一致性,它通过牺牲短暂的一致性窗口,换来了系统的高可用和整体性能。

二、消息表方案:一种可靠且易于理解的最终一致性实现

最终一致性的实现模式有很多,如TCC、Saga、可靠消息。其中,消息表方案因其简单、可靠、对业务侵入相对较小,成为许多团队的首选。它的核心思想是:将分布式事务的协调工作,本地化到每一个服务的事务中

其架构核心包含两个角色:

  1. 事务发起方(如订单服务):在本地数据库事务中,除了完成本职业务(创建订单),还会向同一数据库的“本地消息表”插入一条待发送的消息记录。利用本地事务的ACID特性,保证业务和消息记录要么都成功,要么都失败。
  2. 消息中间件与消费者(如积分服务):一个独立的“消息转发服务”或中间件客户端组件,会轮询或监听本地消息表,将状态为“待发送”的消息投递到MQ(如RocketMQ/Kafka)。下游服务消费MQ消息,完成自身业务,并可能通过回调或发送新消息来确认。

这个方案巧妙地将一个分布式事务,拆解为多个本地事务,通过MQ的可靠传递来串联,实现了数据的最终一致。

三、实战:基于Spring Boot和RocketMQ实现消息表方案

下面,我们模拟一个经典场景:创建订单后,异步通知库存服务扣减库存

步骤1:设计数据库表

首先,在订单服务的数据库中,我们需要两张表。

订单表 (order_info)

CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `order_no` varchar(32) NOT NULL,
  `user_id` bigint(20) NOT NULL,
  `product_id` bigint(20) NOT NULL,
  `amount` int(11) NOT NULL,
  `status` tinyint(4) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

本地消息表 (local_message)

CREATE TABLE `local_message` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `biz_id` varchar(64) NOT NULL COMMENT '业务ID,如订单ID',
  `biz_type` varchar(32) NOT NULL COMMENT '业务类型,如ORDER_CREATED',
  `msg_content` text COMMENT '消息内容,通常为JSON',
  `msg_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0-待发送,1-已发送,2-发送失败',
  `retry_count` int(11) DEFAULT '0',
  `next_retry_time` datetime DEFAULT NULL,
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `idx_status_retry` (`msg_status`,`next_retry_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

步骤2:订单服务——在事务中写入本地消息

这是最关键的步骤,必须保证创建订单和写入消息表的原子性。

@Service
@Transactional
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private LocalMessageMapper messageMapper;

    public void createOrder(OrderDTO orderDTO) {
        // 1. 创建订单(核心业务)
        OrderInfo order = new OrderInfo();
        BeanUtils.copyProperties(orderDTO, order);
        order.setOrderNo(generateOrderNo());
        orderMapper.insert(order);

        // 2. 构造并保存本地消息(在同一事务中)
        LocalMessage message = new LocalMessage();
        message.setBizId(order.getOrderNo()); // 业务ID
        message.setBizType("ORDER_CREATED");
        // 消息体包含库存服务需要的信息
        Map content = new HashMap<>();
        content.put("orderNo", order.getOrderNo());
        content.put("productId", order.getProductId());
        content.put("amount", order.getAmount());
        message.setMsgContent(JSON.toJSONString(content));
        message.setMsgStatus(0); // 待发送
        messageMapper.insert(message);
        // 事务在此方法结束时提交,订单和消息记录被同时持久化
    }
}

踩坑提示:务必确保@Transactional生效,并且OrderService.createOrder的调用方没有绕过事务(例如,在类内部通过`this.createOrder()`调用)。

步骤3:消息转发服务——将消息扫出并投递到MQ

我们需要一个后台任务,定时扫描`local_message`表中状态为“待发送”或“发送失败且到达重试时间”的消息。

@Component
@Slf4j
public class MessageRelayTask {

    @Autowired
    private LocalMessageMapper messageMapper;
    @Autowired
    private RocketMQTemplate rocketMQTemplate; // Spring Boot RocketMQ Starter

    @Scheduled(fixedDelay = 5000) // 每5秒执行一次
    public void relayMessages() {
        List pendingMessages = messageMapper.selectPending(100); // 每次取100条
        for (LocalMessage message : pendingMessages) {
            try {
                // 发送到RocketMQ,Topic为“ORDER_EVENT”
                SendResult sendResult = rocketMQTemplate.syncSend("ORDER_EVENT",
                        MessageBuilder.withPayload(message.getMsgContent()).build());
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    // 发送成功,更新消息状态为“已发送”
                    message.setMsgStatus(1);
                    messageMapper.updateById(message);
                    log.info("消息转发成功,消息ID: {}", message.getId());
                }
            } catch (Exception e) {
                log.error("消息转发失败,消息ID: {}", message.getId(), e);
                // 更新重试次数和下次重试时间
                message.setRetryCount(message.getRetryCount() + 1);
                message.setNextRetryTime(calculateNextRetryTime(message.getRetryCount()));
                messageMapper.updateById(message);
            }
        }
    }
}

实战经验:这里存在一个“小概率”的临界点问题:消息成功发送到MQ,但更新本地消息状态为“已发送”时失败。这会导致消息被重复投递。因此,消费方必须实现幂等性,这是最终一致性方案中消费者必须遵守的铁律。

步骤4:库存服务——消费MQ消息并实现幂等

库存服务监听`ORDER_EVENT`主题,消费消息并扣减库存。

@Service
@RocketMQMessageListener(topic = "ORDER_EVENT", consumerGroup = "inventory-consumer-group")
@Slf4j
public class InventoryConsumer implements RocketMQListener {

    @Autowired
    private InventoryService inventoryService;

    @Override
    public void onMessage(String message) {
        log.info("收到订单创建消息: {}", message);
        try {
            // 1. 解析消息
            JSONObject jsonObject = JSON.parseObject(message);
            String orderNo = jsonObject.getString("orderNo");
            Long productId = jsonObject.getLong("productId");
            Integer amount = jsonObject.getInteger("amount");

            // 2. 关键!业务幂等检查:通过订单号判断是否已处理过
            if (inventoryService.isOrderProcessed(orderNo)) {
                log.warn("订单 {} 的库存扣减已处理,跳过幂等消息。", orderNo);
                return;
            }

            // 3. 执行核心业务逻辑:扣减库存
            boolean success = inventoryService.deductInventory(productId, amount, orderNo);
            if (!success) {
                // 库存不足或其他业务失败,可以记录日志、发报警,甚至投递到死信队列人工处理
                log.error("订单 {} 扣减库存失败,商品ID: {}", orderNo, productId);
                // 注意:这里通常不抛出异常让MQ重试,除非是网络等临时故障。业务失败应另做处理。
            }
        } catch (Exception e) {
            log.error("处理订单消息异常,消息体: {}", message, e);
            // 抛出异常,让RocketMQ根据重试策略重新投递(适用于网络抖动、数据库临时不可用等场景)
            throw new RuntimeException("消息处理失败,要求重试", e);
        }
    }
}

库存服务内部需要维护一个“去重表”或利用订单号在业务表中的唯一约束来实现isOrderProcessed逻辑,这是保证幂等性的核心。

四、方案总结与优化思考

消息表方案实现下来,你会发现它结构清晰,技术栈要求不高(数据库+MQ),非常适合作为分布式事务的入门实践。但它也有其优缺点:

优点

  • 理解简单,实现成本较低。
  • 依赖于成熟的MQ,消息可靠性高。
  • 业务耦合度较低,每个服务职责清晰。

缺点与优化方向

  • 消息延迟:依赖定时任务扫描,有秒级延迟。可考虑使用数据库的CDC(变更数据捕获,如Debezium)或事务日志尾部监听来近乎实时地捕获消息。
  • 数据库压力:频繁扫描消息表。需优化索引,并控制每次扫描的批次大小。
  • 架构复杂度:需要维护消息转发服务。一些云厂商的消息服务(如阿里云的RocketMQ)提供了“事务消息”功能,可以替代本地消息表,进一步简化架构,这也是进阶学习的推荐方向。

总之,分布式事务没有银弹。消息表方案以其扎实、可靠的特性,在追求最终一致性的场景中依然占据重要一席。希望这篇结合实战的剖析,能帮助你更好地理解和应用这一模式,在你的系统中平滑落地。记住,好的设计都是在权衡中做出的最适合的选择。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。