SpringCloudStream动态路由Key配置与RabbitMQ实战指南
前言
在实际业务开发中,我们常常会遇到这样的场景:用户完成一系列操作后,系统需要根据不同的操作结果,发送不同类型的通知邮件。如果直接在业务逻辑里同步调用邮件服务,不仅耗时,还会拖慢主流程。这时候,引入消息中间件进行异步解耦,就成了一个自然而然的选择。
免费影视、动漫、音乐、游戏、小说资源长期稳定更新! 👉 点此立即查看 👈
不过,新的问题也随之而来。不同的邮件类型,本质上对应着不同的业务逻辑,我们当然希望它们能被不同的消费者处理。在RabbitMQ的模型里,这就意味着生产者需要根据消息类型,将其投递到不同的队列。如何实现呢?核心就在于路由键(Routing Key)。
如上图所示,一个交换机(Exchange)可以根据不同的路由键,将消息精准地路由到与之绑定的队列中。这正是我们需要的效果。
要实现这个目标,思路其实很清晰:对于消费者而言,它需要声明自己只关心某个特定路由键的消息;而对于生产者,则需要在发送消息时,动态地指定这个消息的路由键。
这里还有一个Spring Cloud Stream的重要概念:group。在绑定RabbitMQ时,一个group就对应一个具体的队列。所以,要区分不同的业务,我们完全可以通过配置不同的group来实现。
例子
接下来,我们通过一个完整的代码示例,来看看如何用Spring Cloud Stream + RabbitMQ实现动态路由。示例包含一个生产者服务和一个消费者服务。
生产者
生产者的配置核心在于两点:一是定义Binder连接RabbitMQ,二是指定动态路由键的表达式。
spring:
application:
name: producer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: xxxxxx
bindings: # 关联整合通道和binder对象
output: # output是我们定义的通道名称,此处不能乱改
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json
binder: etpmsRabbitMQ # 关联MQ服务
rabbit:
bindings:
output:
producer:
# 生产者配置RabbitMq的动态路由键
routingKeyExpression: headers.type
请注意routingKeyExpression: headers.type这行配置。它告诉Spring Cloud Stream:消息的路由键,要从消息头(Header)中名为type的字段去获取。
配置好了,发送消息的代码就非常简单了:
package top.chenyt.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* @author yantao.chen
*/
@Service
public class ProviderService {
/**
* 将MessageChannel的封装对象Source注⼊到这⾥使⽤
*/
@Autowired
private Source source;
public void sendMessage(String content, String type) {
// 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
// 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
source.output().send(MessageBuilder.withPayload(content).setHeader("type",type).build());
}
}
关键在于MessageBuilder.withPayload(content).setHeader("type",type).build()这一句。我们在构建消息时,通过setHeader方法设置了type这个头信息。发送时,框架会自动根据配置的表达式headers.type提取这个值,并将其作为路由键发送到RabbitMQ。
最后,别忘了在主应用类上启用绑定:
package top.chenyt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@SpringBootApplication
@Slf4j
@EnableBinding({Source.class})
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
总结一下生产者的关键步骤:在配置文件中定义routingKeyExpression,然后在发送消息时通过setHeader设置对应的值。
消费者
消费者的配置稍微复杂一点,因为我们需要定义多个绑定,每个绑定对应一个队列(即一个group),并指定其监听的路由键。
spring:
application:
name: consumer
cloud:
stream:
binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
environment: # MQ环境配置(⽤户名、密码等)
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: xxxxx
bindings: # 关联整合通道和binder对象
input: # input是我们定义的通道名称,此处不能乱改
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
binder: etpmsRabbitMQ # 关联MQ服务
group: register
my-input:
destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
binder: etpmsRabbitMQ # 关联MQ服务
group: task
rabbit:
bindings:
my-input:
consumer:
bindingRoutingKey: task
input:
consumer:
bindingRoutingKey: register
仔细看这段配置,我们定义了两个绑定:input和my-input。它们连接的是同一个交换机testExchange,但属于不同的group(register和task),这会在RabbitMQ中创建两个独立的队列。
最关键的部分在spring.cloud.stream.rabbit.bindings下面。我们为每个绑定指定了bindingRoutingKey。这意味着:input通道对应的队列,只绑定路由键为register的消息;my-input通道对应的队列,只绑定路由键为task的消息。
接下来,我们需要定义my-input这个自定义通道的接口:
package top.chenyt.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel myinput();
}
然后,编写消息监听器,分别监听这两个通道:
package top.chenyt.consumer;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@Service
public class ConsumerMsg {
@StreamListener(Sink.INPUT)
public void receiveMessages(Message message) {
System.out.println("========= input接收到的消息:" + message.getPayload());
}
@StreamListener(MySink.MY_INPUT)
public void receiveMessages02(Message message) {
System.out.println("========= myinput接收到的消息:" + message.getPayload());
}
}
最后,在主应用类中启用这两个绑定:
package top.chenyt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import top.chenyt.consumer.MySink;
/**
* @ClassName etpms-parent
* @Author Jinondo
* @Date 2022/1/31 12:42
*/
@SpringBootApplication
@Slf4j
@EnableBinding({Sink.class, MySink.class})
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
至此,整个流程就搭建完成了。当生产者发送一个type头为register的消息时,它会被路由到group为register的队列,并由receiveMessages方法处理。同理,type为task的消息则会由receiveMessages02方法处理。这就完美实现了基于消息类型的动态路由。
总结
通过上面的示例,我们可以看到,利用Spring Cloud Stream的routingKeyExpression和bindingRoutingKey配置,可以非常优雅地实现RabbitMQ的动态路由功能,而无需在代码中编写任何RabbitMQ原生API。这种方法清晰地将配置与业务代码分离,提高了可维护性。
其核心逻辑可以概括为:生产者通过消息头动态传递路由键,消费者通过配置静态声明其绑定的路由键。两者配合,再加上group对队列的隔离,就能构建出清晰、灵活的消息路由方案。希望这个实践思路能为大家在处理类似业务场景时提供一个可靠的参考。
您可能感兴趣的文章:
- SpringCloudStream+RabbitMQ使用中遇到的问题及解决
- 解决SpringCloudStream整合Kafka,两个通道对应同一个topic报错的情况
- SpringCloud Stream 快速入门实例教程
- SpringCloud使用Kafka Streams实现实时数据处理
- SpringCloudStream原理和深入使用小结
- SpringCloud中的Stream服务间消息传递详解
- SpringCloudStream中的消息分区数详解
相关攻略
在异步消息处理场景中,需根据消息类型将其路由至不同队列。通过SpringCloudStream整合RabbitMQ,生产者可利用消息头动态指定路由键,消费者通过配置绑定特定路由键及消费者组,实现消息的精准分发。该方法将路由逻辑配置化,避免硬编码,提升了系统的可维护性与灵活性。
一、使用@SuppressWarnings注解临时抑制警告 当重构条件尚不成熟,又需要快速让编译通过时,这个方法可以派上用场。它本质上是在告诉编译器:“我知道这里用了旧东西,先别报警,容我缓缓。” 但务必记住,这只是权宜之计,代码的兼容性风险依然存在。 具体操作很简单:在调用废弃API的类、方法甚至
如何在 Ja va 中使用 ThreadLocal remove() 确保在线程池复用场景下不会发生数据污染 说到线程池和 ThreadLocal 的搭配使用,一个看似不起眼、实则极易“踩坑”的细节就是数据清理。想象一下,你精心设计的线程池正在高效运转,却因为某个任务留下的“数据尾巴”,导致后续任务
如何在 Ja va 中利用 try-catch 实现对“软错误”的平滑感知与非侵入式监控日志记录 在 Ja va 开发中,我们常常会遇到一些“软错误”——它们不会让程序直接崩溃,却可能悄悄影响业务的正确性或用户体验。比如,调用第三方 API 时返回了空响应、缓存查询未命中、配置文件里某个非关键项缺失
Ja va技术体系概览:从核心组件到应用演进 要理解Ja va,首先得厘清几个核心概念。简单来说,JDK (Ja va Development Kit) 是开发者的工具箱,它包含了三件套:Ja va编程语言本身、丰富的API类库以及运行的核心——JVM(Ja va虚拟机)。而JRE (Ja va R
热门专题
热门推荐
H3C路由器登录管理界面提示证书错误,本质是浏览器与设备间SSL TLS安全握手未通过验证,属常见且可快速处置的技术现象。 遇到H3C路由器管理界面弹出“证书错误”的警告,你先别慌。这本质上不是什么大故障,而是浏览器与你的路由器之间在进行安全“握手”时,验证流程没走通。这在设备圈子里其实挺常见,尤其
针式打印机本身不使用墨粉,而是依靠色带击打完成打印,因此不存在“加墨粉”这一操作,更谈不上墨粉对寿命的影响。所谓“给针打加墨粉”的说法,实为混淆了针式打印机与激光打印机的核心成像原理——前者依赖物理撞击使色带染料转印,后者才通过静电吸附墨粉并经高温定影。权威行业资料显示,针式打印机的使用寿命主要取决
针式打印机不能加墨粉,它使用的是物理击打式打印原理,依靠色带盒中的油墨浸润织物带实现字符转印。 这事儿其实很好理解。针式打印机和办公室里常见的激光打印机,完全是两套“武功路数”。后者依赖碳粉在感光鼓上成像,再经过热压定影,过程充满了静电与高温的精密配合。而针式打印机呢?它的核心耗材体系自始至终都围绕
苏泊尔电磁炉的定时功能通常集成在面板主控区,通过“定时”专用按键一键调出 想给炖汤定个时,或者让火锅到点自动关机?这个操作其实就藏在面板的按键区里。苏泊尔电磁炉大多设有一个独立的“定时”键,位置通常在功能键组的右侧或者数字键的上方,图标很好认,不是沙漏就是个小时钟。轻轻一按,配合旁边的“加”和“减”
高端手机5G频段覆盖差异,核心在于对n28与n79等关键频段的支持完整性 说到高端手机的5G体验,一个常被忽略但至关重要的差异,就藏在那些看似枯燥的频段编号里。尤其是n28(700MHz)和n79(4 9GHz)这两个关键频段,它们的支持是否完整,直接决定了手机信号是“真全能”还是“有短板”。低频段





