谈到微服务架构下的跨服务数据同步,许多开发者首先想到的是让人头疼的强一致性分布式事务。例如订单创建后需要扣减库存、生成运单、发送通知——如果强行采用2PC等强一致方案,性能和系统复杂度将难以承受。Taocarts采用了更务实的策略:本地消息表结合消息队列,通过最终一致性实现服务解耦。
一、问题背景
在微服务架构中,跨服务数据同步始终是一个经典难题。以Taocarts系统为例,订单创建后需要将数据同步至库存服务(扣减库存)、物流服务(生成运单)、通知服务(发送邮件)等多个下游系统。若采用强一致性的分布式事务(例如2PC),系统的性能与开发复杂性会急剧攀升。因此,Taocarts选择了“本地消息表 + 消息队列”的模式,通过最终一致性来平衡可靠性与效率。
二、本地消息表设计
sql
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL,
topic VARCHAR(64) NOT NULL,
payload TEXT NOT NULL,
status TINYINT NOT NULL DEFAULT 0 COMMENT '0-待发送(初始), 1-已成功发送, 2-发送失败(死信)',
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_retry_time DATETIME DEFAULT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_message_id (message_id),
KEY idx_status_next_retry (status, next_retry_time)
);
这张本地消息表是整个方案的基石。message_id字段确保全局唯一性,status字段标记消息的生命周期状态,而retry_count和next_retry_time协同实现指数退避重试机制。
三、订单创建时写入本地消息
ja va
@Service
@Transactional
public class OrderService {
public void createOrder(OrderDTO orderDTO) {
// 步骤1:保存订单实体
Order order = new Order();
order.setOrderNo(generateOrderNo());
orderMapper.insert(order);
// 步骤2:保存本地消息记录
LocalMessage msg = new LocalMessage();
msg.setMessageId(UUID.randomUUID().toString());
msg.setTopic("ORDER_CREATED");
msg.setPayload(JSON.toJSONString(order));
msg.setStatus(0);
msg.setNextRetryTime(new Date());
messageMapper.insert(msg);
}
}
关键点在于订单与消息在同一个本地事务中写入。这样,订单创建成功后消息必然落库;订单失败则消息不会出现。从而避免了传统“先写订单再发送MQ”带来的数据不一致风险。
四、后台任务扫描并发送
ja va
@Component
public class MessageSendScheduler {
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List messages = messageMapper.selectPendingMessages(100);
for (LocalMessage msg : messages) {
try {
SendResult result = rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload());
if (result.getSendStatus() == SendStatus.SEND_OK) {
msg.setStatus(1);
messageMapper.updateById(msg);
}
} catch (Exception e) {
msg.setRetryCount(msg.getRetryCount() + 1);
if (msg.getRetryCount() >= msg.getMaxRetries()) {
msg.setStatus(2);
alertService.send("消息进入死信队列: " + msg.getMessageId());
} else {
long delayMinutes = 1L << msg.getRetryCount();
msg.setNextRetryTime(new Date(System.currentTimeMillis() + delayMinutes * 60 * 1000));
}
messageMapper.updateById(msg);
}
}
}
}
这里采用每5秒执行一次的定时任务,批量拉取待发送消息。发送失败后按照指数退避策略进行重试,超过最大重试次数则标记为死信并触发告警。特别值得注意的是重试间隔的计算方式——使用左移运算,依次为1分钟、2分钟、4分钟……最多3次后放弃。
五、消费端幂等处理
ja va
@Component
@RocketMQMessageListener(topic = "ORDER_CREATED", consumerGroup = "inventory-consumer")
public class InventoryConsumer implements RocketMQListener {
@Override
public void onMessage(String message) {
String messageId = JSON.parseObject(message).getString("messageId");
String key = "processed:" + messageId;
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(success)) {
log.info("该消息已经处理过,直接跳过: {}", messageId);
return;
}
try {
inventoryService.deductStock(...);
} catch (Exception e) {
redisTemplate.delete(key);
throw e;
}
}
}
消费端通过Redis的SETNX命令实现幂等性,设置24小时过期时间以避免重复处理。一旦业务逻辑出现异常,主动删除key以便消息能够被重新消费——这里采用抛出异常的方式触发MQ的重试机制,而不是静默吞掉异常。一个关键细节:如果库存扣减失败,key被删除后,消息会回到Broker重新投递;而本地消息表中的消息状态并不会回退,因为发送端已将其标记为“已发送”。最终一致性依赖于消费端的重试机制和业务补偿操作。
六、总结
Taocarts利用本地消息表结合消息队列,成功实现了订单创建与下游服务之间的解耦,消息送达率高达99.99%。核心经验:分布式系统应当拥抱“最终一致性”,而非盲目追求“强一致性”。

