在微服务架构中,跨服务的数据同步始终是微服务开发者面临的常见挑战。以 Taocarts 跨境电商独立站系统为例:用户下单后,订单数据需要同步至商品服务以扣减库存、物流服务以生成运单、通知服务以发送邮件通知——多个下游系统均需等待。若强行采用强一致性分布式事务,性能和复杂度都会急剧上升。更务实的做法是引入“最终一致性”:允许短暂的不一致窗口,但保障数据最终能够对齐。本文详细阐述基于本地消息表 + RocketMQ 的最终一致性方案,并附上完整代码实现。

业务场景与方案选型
创建订单时需完成三项跨服务操作:扣减商品库存(商品服务)、创建物流单(物流服务)、发送订单通知(通知服务)。若使用传统两阶段提交(2PC),不仅性能低下,一旦任一参与者故障,整个事务必须回滚,用户体验严重受损。而本地消息表方案则更具智慧:在订单创建的本地事务中,同步写入一条“待发送消息”,事务提交后由后台异步任务将消息推入消息队列,下游服务再各自拉取消费。这样既保障了订单数据的可靠性,又实现了系统间的松耦合。
本地消息表结构设计
首先来看数据库中的消息表建表语句:
sql
CREATE TABLE `local_message` (
`id` bigint PRIMARY KEY AUTO_INCREMENT,
`message_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`topic` varchar(64) NOT NULL COMMENT 'MQ Topic',
`payload` text NOT NULL COMMENT '消息内容(JSON)',
`status` tinyint NOT NULL DEFAULT 0 COMMENT '0-待发送, 1-已发送, 2-发送失败',
`retry_count` int NOT NULL DEFAULT 0,
`max_retries` int NOT NULL DEFAULT 3,
`next_retry_time` datetime DEFAULT NULL,
`created_at` datetime DEFAULT CURRENT_TIMESTAMP,
`updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY `uk_message_id` (`message_id`),
KEY `idx_status_next_retry` (`status`, `next_retry_time`)
);
订单创建时同步写入本地消息
核心操作来了:在订单创建的事务范围内,同时插入本地消息记录。订单与消息要么一起成功提交,要么一起回滚失败,绝无商量余地。
ja va
@Service
@Transactional
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private LocalMessageMapper messageMapper;
public void createOrder(OrderDTO orderDTO) {
// 1. 保存订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(orderDTO.getUserId());
order.setAmount(orderDTO.getAmount());
order.setStatus(OrderStatus.PENDING_PAYMENT);
orderMapper.insert(order);
// 2. 保存本地消息
LocalMessage msg = new LocalMessage();
msg.setMessageId(UUID.randomUUID().toString());
msg.setTopic("ORDER_CREATED");
msg.setPayload(JSON.toJSONString(order));
msg.setStatus(0);
msg.setNextRetryTime(new Date());
messageMapper.insert(msg);
}
}
后台定时任务扫描并发送消息
光写入还不够,必须有一个定时任务不断扫描待发送的消息,真正投递到 RocketMQ 中。发送成功则更新状态为“已发送”;失败则增加重试次数,并按照指数退避策略设置下一次重试时间——避免集中重试打垮系统。
ja va
@Component
public class MessageSendScheduler {
@Autowired
private LocalMessageMapper messageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List messages = messageMapper.selectPendingMessages(100);
for (LocalMessage msg : messages) {
try {
SendResult result = rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload());
if (result.getSendStatus() == SendStatus.SEND_OK) {
msg.setStatus(1);
messageMapper.updateById(msg);
}
} catch (Exception e) {
msg.setRetryCount(msg.getRetryCount() + 1);
if (msg.getRetryCount() >= msg.getMaxRetries()) {
msg.setStatus(2); // 失败,进入死信队列
// 发送告警通知
alertService.send("消息发送失败,已进入死信队列: " + msg.getMessageId());
} else {
// 指数退避:2^retryCount 分钟
long delayMinutes = 1L << msg.getRetryCount();
msg.setNextRetryTime(new Date(System.currentTimeMillis() + delayMinutes * 60 * 1000));
}
messageMapper.updateById(msg);
}
}
}
}
下游消费者实现幂等消费
下游服务在消费消息时,必须确保幂等性——否则重复消费将产生脏数据。这里采用 Redis 记录已处理的消息 ID,简单且可靠。
ja va
@Component
@RocketMQMessageListener(topic = "ORDER_CREATED", consumerGroup = "inventory-consumer")
public class InventoryConsumer implements RocketMQListener {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private InventoryService inventoryService;
@Override
public void onMessage(String message) {
JSONObject json = JSON.parseObject(message);
String messageId = json.getString("messageId");
// 幂等检查
String key = "processed:" + messageId;
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(success)) {
log.info("消息已被处理过,跳过: {}", messageId);
return;
}
try {
Order order = json.getObject("order", Order.class);
inventoryService.deductStock(order.getProductId(), order.getQuantity());
} catch (Exception e) {
// 处理失败,删除幂等标记,让消息重新投递
redisTemplate.delete(key);
throw e;
}
}
}
死信处理与人工介入
当消息进入死信队列(status=2)时,系统会自动向钉钉群发送告警。运维人员可以登录管理后台查看详情,手动重发或修复数据。毕竟再完善的自动化,也需要一个兜底机制。
ja va
@RestController
@RequestMapping("/admin/messages")
public class DeadLetterController {
@Autowired
private LocalMessageMapper messageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/retry/{id}")
public Result retry(@PathVariable Long id) {
LocalMessage msg = messageMapper.selectById(id);
if (msg.getStatus() != 2) {
return Result.error("只有死信消息可以重试");
}
rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload());
msg.setStatus(0);
msg.setRetryCount(0);
msg.setNextRetryTime(new Date());
messageMapper.updateById(msg);
return Result.success();
}
}
方案总结
本地消息表 + 消息队列的搭配,是实现最终一致性的经典模式。Taocarts 系统凭借这套方案成功解耦了订单创建与库存扣减、物流生成、通知发送等操作,既保证了数据最终能够对齐,又显著提升了系统的吞吐量和可扩展性。在生产环境运行半年后,消息送达率稳定在 99.99%,仅有极少数死信通过人工介入快速修复。该方案尤其适合对实时性要求不高、但数据绝对不能丢失的业务场景——简而言之,大多数业务系统都可以直接借鉴。
