首页 游戏 软件 资讯 排行榜 专题
首页
科技数码
Spring Boot 轻量级分布式事务:基于消息最终一致性的创新实践

Spring Boot 轻量级分布式事务:基于消息最终一致性的创新实践

热心网友
13
转载
2025-12-15

分布式事务没有银弹,轻量级方案在保证可用性的前提下,通过最终一致性实现业务需求的平衡,是互联网高并发场景的最佳选择。​

免费影视、动漫、音乐、游戏、小说资源长期稳定更新! 👉 点此立即查看 👈


前言

在微服务架构中,分布式事务是最大的挑战之一。本文将揭示如何在不依赖重量级事务管理器的情况下,通过Spring Boot实现高可用、低延迟的轻量级分布式事务解决方案,处理效率提升300%!

一、分布式事务困境:ACID vs BASE

1.1 传统方案的局限性

1.2 轻量级方案核心思想

核心原则:

最终一致性:允许短暂不一致事件驱动:通过消息解耦服务幂等设计:支持重复消费补偿机制:失败自动重试

二、Spring Boot实现方案:事务消息+本地事件表

2.1 架构设计

2.2 核心依赖

org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.3 com.baomidou mybatis-plus-boot-starter 3.5.3.1 cn.hutool hutool-all 5.8.16

三、核心实现源码

3.1 事件表设计

@Data@TableName("distributed_event")public class DistributedEvent { @TableId(type = IdType.ASSIGN_ID) private Long id; private String eventType; // 事件类型:ORDER_CREATED, PAYMENT_SUCCESS private String payload; // JSON格式事件数据 private String status; // 状态:NEW, PROCESSING, SUCCESS, FAILED private Integer retryCount; // 重试次数 private LocalDateTime createTime; private LocalDateTime updateTime;}// 事件状态枚举public enum EventStatus { NEW, PROCESSING, SUCCESS, FAILED}

3.2 本地事务管理器

@Service@Transactionalpublic class TransactionCoordinator { private final DistributedEventMapper eventMapper; private final RocketMQTemplate rocketMQTemplate; public void executeInTransaction(Runnable businessLogic, String eventType, Object payload) { // 1. 执行业务逻辑 businessLogic.run(); // 2. 保存事件到数据库 DistributedEvent event = new DistributedEvent(); event.setEventType(eventType); event.setPayload(JSON.toJSONString(payload)); event.setStatus(EventStatus.NEW.name()); event.setRetryCount(0); eventMapper.insert(event); // 3. 发送事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "tx-event-group", "event-topic", MessageBuilder.withPayload(event.getId()).build(), event.getId() ); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new TransactionException("消息发送失败"); } }}

3.3 RocketMQ事务监听器

@RocketMQTransactionListener(txProducerGroup = "tx-event-group")public class EventTransactionListener implements RocketMQTransactionListener { private final DistributedEventMapper eventMapper; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Long eventId = (Long) arg; DistributedEvent event = eventMapper.selectById(eventId); if (event != null && EventStatus.NEW.name().equals(event.getStatus())) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(Message msg) { Long eventId = Long.parseLong(new String(msg.getBody())); DistributedEvent event = eventMapper.selectById(eventId); if (event == null) { return LocalTransactionState.ROLLBACK_MESSAGE; } return EventStatus.NEW.name().equals(event.getStatus()) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; }}

3.4 事件消费者

@Service@RocketMQMessageListener( topic = "event-topic", consumerGroup = "event-consumer-group")public class EventConsumer implements RocketMQListener { private final EventDispatcher eventDispatcher; private final DistributedEventMapper eventMapper; @Override @Transactional public void onMessage(String message) { Long eventId = Long.parseLong(message); DistributedEvent event = eventMapper.selectById(eventId); // 幂等性检查 if (event == null || !EventStatus.NEW.name().equals(event.getStatus())) { return; } // 更新状态为处理中 event.setStatus(EventStatus.PROCESSING.name()); eventMapper.updateById(event); try { // 分发事件处理 eventDispatcher.dispatch(event); // 处理成功 event.setStatus(EventStatus.SUCCESS.name()); } catch (Exception e) { // 处理失败 event.setStatus(EventStatus.FAILED.name()); event.setRetryCount(event.getRetryCount() + 1); } eventMapper.updateById(event); }}

3.5 事件分发器

@Componentpublic class EventDispatcher { private final Map handlers = new ConcurrentHashMap<>(); // 注册处理器 public void registerHandler(String eventType, EventHandler handler) { handlers.put(eventType, handler); } public void dispatch(DistributedEvent event) { EventHandler handler = handlers.get(event.getEventType()); if (handler == null) { throw new EventHandleException("未找到事件处理器: " + event.getEventType()); } handler.handle(event); }}// 订单创建事件处理器@Componentpublic class OrderCreatedHandler implements EventHandler { private final PaymentService paymentService; @Override public void handle(DistributedEvent event) { OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class); paymentService.createPayment(payload.getOrderId(), payload.getAmount()); }}

3.6 补偿任务(定时重试)

@Slf4j@Componentpublic class EventCompensator { private final EventDispatcher eventDispatcher; private final DistributedEventMapper eventMapper; private final RocketMQTemplate rocketMQTemplate; @Scheduled(fixedDelay = 30000) // 每30秒执行一次 public void compensateFailedEvents() { // 查询失败且重试次数小于5次的事件 List failedEvents = eventMapper.selectList( new QueryWrapper() .eq("status", EventStatus.FAILED.name()) .lt("retry_count", 5) ); for (DistributedEvent event : failedEvents) { try { log.info("重试事件: {}", event.getId()); rocketMQTemplate.syncSend("event-topic", event.getId().toString()); } catch (Exception e) { log.error("事件重试发送失败: {}", event.getId(), e); } } }}

四、应用场景实战

4.1 电商下单场景

代码实现:

// 订单服务@Servicepublic class OrderService { private final TransactionCoordinator coordinator; public void createOrder(Order order) { coordinator.executeInTransaction(() -> { // 1. 保存订单 orderMapper.insert(order); // 2. 生成事件数据 OrderCreatedEvent event = new OrderCreatedEvent(); event.setOrderId(order.getId()); event.setAmount(order.getAmount()); }, "ORDER_CREATED", event); }}// 支付服务@Componentpublic class PaymentHandler implements EventHandler { @Override public void handle(DistributedEvent event) { OrderCreatedEvent payload = JSON.parseObject(event.getPayload(), OrderCreatedEvent.class); paymentService.createPayment(payload.getOrderId(), payload.getAmount()); }}

4.2 跨行转账场景

// 转账服务public void transfer(TransferRequest request) { coordinator.executeInTransaction(() -> { // 1. 扣减转出账户 accountService.debit(request.getFromAccount(), request.getAmount()); // 2. 生成转账事件 TransferEvent event = new TransferEvent(); event.setFromAccount(request.getFromAccount()); event.setToAccount(request.getToAccount()); event.setAmount(request.getAmount()); }, "TRANSFER_INITIATED", event);}// 收款银行服务@Componentpublic class TransferHandler implements EventHandler { @Override public void handle(DistributedEvent event) { TransferEvent payload = JSON.parseObject(event.getPayload(), TransferEvent.class); // 调用银行API bankService.credit(payload.getToAccount(), payload.getAmount()); }}

4.3 酒店预订场景

// 预订服务public void bookHotel(BookingRequest request) { coordinator.executeInTransaction(() -> { // 1. 保存预订记录 bookingMapper.insert(booking); // 2. 生成支付事件 PaymentEvent paymentEvent = new PaymentEvent(); paymentEvent.setBookingId(booking.getId()); paymentEvent.setAmount(booking.getAmount()); }, "BOOKING_CREATED", paymentEvent); // 3. 生成积分事件 PointEvent pointEvent = new PointEvent(); pointEvent.setUserId(request.getUserId()); pointEvent.setPoints(booking.getAmount() / 10); coordinator.executeInTransaction(() -> {}, "POINT_EVENT", pointEvent);}// 积分服务@Componentpublic class PointHandler implements EventHandler { @Override public void handle(DistributedEvent event) { PointEvent payload = JSON.parseObject(event.getPayload(), PointEvent.class); pointService.addPoints(payload.getUserId(), payload.getPoints()); }}

五、高级特性实现

5.1 幂等性设计

public class IdempotentHandler implements EventHandler { private final DistributedEventMapper eventMapper; @Override public void handle(DistributedEvent event) { // 检查是否已处理过 if (eventMapper.selectById(event.getId()) != null) { log.warn("重复事件已忽略: {}", event.getId()); return; } // 处理逻辑... }}

5.2 死信队列处理

@Beanpublic MessageChannel deadLetterChannel() { return MessageChannels.queue().get();}@Bean@ServiceActivator(inputChannel = "deadLetterChannel")public MessageHandler deadLetterHandler() { return message -> { // 处理无法投递的消息 log.error("死信消息: {}", message); DeadLetter deadLetter = new DeadLetter(); deadLetter.setPayload(message.getPayload().toString()); deadLetterRepository.save(deadLetter); };}

5.3 事件溯源

@Entitypublic class EventSourcingRecord { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String aggregateId; // 聚合根ID private String eventType; private String payload; private LocalDateTime timestamp;}public void saveEvent(String aggregateId, String eventType, Object payload) { EventSourcingRecord record = new EventSourcingRecord(); record.setAggregateId(aggregateId); record.setEventType(eventType); record.setPayload(JSON.toJSONString(payload)); record.setTimestamp(LocalDateTime.now()); eventSourcingRepository.save(record);}

六、性能优化策略

6.1 批量事件处理

@RocketMQMessageListener( topic = "event-topic", consumerGroup = "batch-consumer", consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING, selectorExpression = "*", consumeThreadMax = 20)public class BatchEventConsumer implements RocketMQListener> { @Override public void onMessage(List messages) { List eventIds = messages.stream() .map(msg -> Long.parseLong(new String(msg.getBody()))) .collect(Collectors.toList()); // 批量查询事件 List events = eventMapper.selectBatchIds(eventIds); // 批量处理 eventDispatcher.batchDispatch(events); }}

6.2 事件表分片设计

// 按月份分片@TableName("distributed_event_#{T(java.time.LocalDate).now().getMonthValue()}")public class DistributedEvent { // ...}// 动态表名处理器public class MonthShardingTableNameHandler implements ITableNameHandler { @Override public String dynamicTableName(String sql, String tableName) { int month = LocalDate.now().getMonthValue(); return tableName + "_" + month; }}

6.3 异步事件处理

@Configuration@EnableAsyncpublic class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("EventExecutor-"); executor.initialize(); return executor; }}// 异步处理事件@Async@Overridepublic void handle(DistributedEvent event) { // 事件处理逻辑}

七、生产环境最佳实践

7.1 监控指标配置

@Beanpublic MeterRegistryCustomizer metrics() { return registry -> { Gauge.builder("event.queue.size", eventMapper::selectPendingCount) .description("待处理事件数量") .register(registry); Gauge.builder("event.process.duration", eventDispatcher::getAvgProcessTime) .description("事件平均处理时间") .register(registry); };}


7.2 部署架构

7.3 配置建议

rocketmq: name-server: mq1:9876;mq2:9876;mq3:9876 producer: group: tx-producer-group send-message-timeout: 3000 consumer: group: event-consumer-group consume-thread-max: 32event: max-retry: 5 retry-interval: 30000 # 30秒 sharding-strategy: monthly # 分片策略

八、与传统方案对比

九、总结与展望

9.1 方案优势

高性能:单机支持万级TPS低耦合:服务间通过消息解耦高可用:无单点故障可扩展:水平扩展能力强简单易用:Spring Boot无缝集成

9.2 适用场景

电商订单系统跨行转账业务酒店机票预订物联网设备联动微服务间数据同步

9.3 未来演进

事件溯源增强:完整业务追溯能力AI驱动补偿:智能故障预测与修复跨链事务:区块链集成无服务架构:Serverless适配

架构师箴言:分布式事务没有银弹,轻量级方案在保证可用性的前提下,通过最终一致性实现业务需求的平衡,是互联网高并发场景的最佳选择。

来源:https://www.51cto.com/article/822160.html
免责声明: 游乐网为非赢利性网站,所展示的游戏/软件/文章内容均来自于互联网或第三方用户上传分享,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系youleyoucom@outlook.com。

相关攻略

LangChain之外,为何还需关注Spring AI?
AI
LangChain之外,为何还需关注Spring AI?

如果面试官问你这个问题,你可以这样总结:LangChain功能繁多、反应迅速,是探索 AI 前沿的最佳工具,而 Spring AI 更像是一套工业级的生产线。对于企业而言,我们不仅需要调用大模型,更

热心网友
01.07
Spring Boot + sshd-sftp:SSH 命令与文件传输实践
科技数码
Spring Boot + sshd-sftp:SSH 命令与文件传输实践

在现代分布式系统中,服务器间的远程操作与文件传输是常见需求。SSH作为一种安全的网络协议,为远程登录和文件传输提供了可靠保障。 前言在现代分布式系统中,服务器间的远程操作与文件传输是常见需求。SSH

热心网友
12.15
Spring Boot + Pcap4j 实现网络流量抓包与实时分析
科技数码
Spring Boot + Pcap4j 实现网络流量抓包与实时分析

在当今数字化时代,网络流量如同信息社会的血液,承载着海量的数据交互。对网络流量进行有效的抓包与实时分析,是保障网络安全、优化网络性能的关键环节。无论是及时发现潜在的网络攻击,还是排查网络拥塞等问题,

热心网友
12.15
震惊!SpringBoot 接口耗时监控还能这么玩,简单到离谱!
科技数码
震惊!SpringBoot 接口耗时监控还能这么玩,简单到离谱!

随着业务规模的增长,我们还可以在此基础上拓展更多维度,比如统计 QPS、采集请求来源、结合用户信息等,形成完整的 接口性能监控体系。这不仅能帮助我们快速定位问题,更能为系统优化提供强有力的数据支撑。

热心网友
12.15
在 SpringBoot 项目中如何动态切换数据源、数据库?(可直接CV)
科技数码
在 SpringBoot 项目中如何动态切换数据源、数据库?(可直接CV)

如果服务器搭建的是一主多从多个mysql数据源,主服务器用来读。从服务器用来写。此时你在代码层面用注解指定了一个增删改方法到从数据源,但是碰巧此时从数据源失效了,那么就会自动的切换到其它服务器。 前

热心网友
12.15

最新APP

火柴人传奇
火柴人传奇
动作冒险 04-01
街球艺术
街球艺术
体育竞技 04-01
飞行员模拟
飞行员模拟
休闲益智 04-01
史莱姆农场
史莱姆农场
休闲益智 04-01
绝区零
绝区零
角色扮演 04-01

热门推荐

《洛克王国》世界圣羽翼王打法攻略-圣羽翼王技能与实战详解
游戏攻略
《洛克王国》世界圣羽翼王打法攻略-圣羽翼王技能与实战详解

速览攻略:世界圣羽翼王核心打法与全面解析 本攻略将为你完整呈现《洛克王国》世界圣羽翼王的通关秘籍,深度剖析两种高效实战打法:追求极致速度的“燃薪虫四回合速通”与稳定输出的“酷拉无限连击流”。文章将进一步解析这位翼系精灵王的技能机制、属性克制关系及其在PVE与PVP中的实战定位,帮助你彻底掌握应对其隐

热心网友
04.06
《异种航员2》工程系统详解-工作坊与资源管理指南
游戏攻略
《异种航员2》工程系统详解-工作坊与资源管理指南

速览:工程系统核心机制解析 在《异种航员2》中,工程系统是整个抵抗力量赖以运转的“战略后勤中枢”。无论是研发新武器、生产重型装甲还是制造先进飞行器,所有实体装备的产出都依赖于此。简言之,该系统的核心运作围绕着两大关键:工程师人力的高效配置与全球稀缺资源的精细化调度。工程师的数量直接决定了每个项目的建

热心网友
04.06
《洛克王国世界》治愈兔位置详解-任务与战斗关键精灵
游戏攻略
《洛克王国世界》治愈兔位置详解-任务与战斗关键精灵

核心速览 在《洛克王国世界》中,治愈兔是一位兼具功能性任务角色与实战辅助能力的精灵。它的价值不仅在剧情推进中体现,更在于对战里出色的治疗与防护表现。本文将为你全面解析治愈兔的精准获取位置、种族属性特点以及实战技能搭配,助你顺利捕捉并最大化其在队伍中的作用。所有关键信息将通过清晰的图文内容详细展示,确

热心网友
04.06
《红色沙漠》传说之狼打法-传说之狼击杀流程详解
游戏攻略
《红色沙漠》传说之狼打法-传说之狼击杀流程详解

速览 在《红色沙漠》中,挑战传说之狼这一强大的任务BOSS,需要玩家进行充分的准备并遵循完整的任务流程。整个过程环环相扣,你必须首先参与塞莱斯特家族的势力任务,通过完成任务将家族声望提升至指定等级,才能解锁【传说之狼】的专属讨伐任务,最终直面这个传说中的强大生物。 红色沙漠传说之狼怎么打 归根结底,

热心网友
04.06
《宝可梦Pokopia》舒适度提升攻略-环境等级与栖息地优化指南
游戏攻略
《宝可梦Pokopia》舒适度提升攻略-环境等级与栖息地优化指南

【宝可梦Pokopia】舒适度全解析:快速提升环境等级的核心秘诀 你是否正在探索《宝可梦Pokopia》世界,并希望有效提升宝可梦栖息地的舒适度?舒适度不仅是衡量宝可梦快乐程度的晴雨表,更是解锁游戏核心内容、加速发展的关键驱动指标。本攻略将系统性地为你揭示提升舒适度的核心途径,涵盖从装饰栖息地、建造

热心网友
04.06