游乐游手机版
首页/AI教程/文章详情

Spring Kafka深入探秘核心原理与实战案例全解析指南

时间:2026-05-29 16:39
前言 Kafka作为消息队列产品,凭借其基于Topic与Partition的设计,在消息发送与处理性能上表现出色。Spring生态下的Spring-kafka项目,对Apache Kafka-client进行了封装,极大简化了Kafka在Spring项目中的集成流程。除了基础的收发消息能力,Spri

前言

Kafka作为消息队列产品,凭借其基于Topic与Partition的设计,在消息发送与处理性能上表现出色。Spring生态下的Spring-kafka项目,对Apache Kafka-client进行了封装,极大简化了Kafka在Spring项目中的集成流程。除了基础的收发消息能力,Spring-kafka还提供了不少高级功能,下面就来逐一看看这些用法。

spring-kafka深入探秘

简单集成

引入依赖


org.springframework.kafka
spring-kafka
2.2.6.RELEASE

添加配置

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

测试发送和接收

@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private KafkaTemplate template;
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}" , input);
}
}

启动应用后,在浏览器中访问https://localhost:8080/send/kl,就能在控制台看到日志输出:input value: "kl"。基础用法其实就这么简单:发送消息注入KafkaTemplate,接收消息加上@KafkaListener注解。

Spring-kafka-test嵌入式Kafka Server

上面的代码能运行的前提是已经准备好了Kafka Server服务环境。Kafka基于Scala和Zookeeper构建,从官网下载部署包本地部署也能搞定。不过,为了简化开发阶段的验证,Spring-Kafka-Test封装了Kafka-test,提供了注解式的一键开启Kafka Server功能,用起来非常便捷。本文后面所有的测试用例,Kafka服务都使用这种嵌入式方式提供。

引入依赖


org.springframework.kafka
spring-kafka-test
2.2.6.RELEASE
test

启动服务

下面用Junit测试用例直接启动一个包含四个Broker节点的Kafka Server服务:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads()throws IOException {
System.in.read();
}
}

只需要一个@EmbeddedKafka注解,就能启动一个功能完整的Kafka服务,是不是很酷?默认只写注解不加参数时,会创建一个随机端口的Broker,启动日志中会输出具体端口和默认配置项。Kafka安装包配置文件里的配置项,这个注解的参数也都能配置。下面详解一下@EmbeddedKafka注解的可设置参数:

  • value:broker节点数量
  • count:同value作用一样,也是配置broker节点数量
  • controlledShutdown:控制关闭开关,主要用于Broker意外关闭时,减少此Broker上Partition的不可用时间

Kafka是多Broker架构的高可用服务,一个Topic对应多个Partition,每个Partition可以有多个Replication副本,这些副本保存在多个Broker上用于高可用。不过,虽然存在多个分区副本集,当前工作副本集只有一个,默认就是首次分配的副本集(首选副本)作为Leader,负责写入和读取数据。升级Broker或更新配置需要重启服务时,需要将Partition转移到可用的Broker。主要涉及三种情况:

  • 直接关闭Broker:Broker关闭时,集群会重新选主,选举出的新Broker作为Partition Leader。选举过程中,此Broker上的Partition会短时不可用
  • 开启controlledShutdown:Broker关闭时,会先尝试将Leader角色转移到其他可用Broker上
  • 使用命令行工具:通过bin/kafka-preferred-replica-election.sh手动触发Partition Leader角色转移
  • ports:端口列表,是一个数组,对应count参数,有几个Broker就要对应几个端口号
  • brokerProperties:Broker参数设置,是一个数组结构,支持如下方式设置Broker参数:

@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})

  • brokerPropertiesLocation:Broker参数文件设置

功能同上面的brokerProperties。Kafka Broker的可设置参数多达182个,都像上面那样配置显然不是最优方案,所以提供了加载本地配置文件的功能,如:

@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

创建新的Topic

默认情况下,如果用KafkaTemplate发送消息时Topic不存在,会创建一个新的Topic。默认的分区数和副本数由以下Broker参数设定:

  • num.partitions = 1 #默认Topic分区数
  • num.replica.fetchers = 1 #默认副本数

程序启动时创建Topic

@Configuration
public class KafkaConfig {
@Bean
public KafkaAdmin admin(KafkaProperties properties){
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
admin.setFatalIfBrokerNotA vailable(true);
return admin;
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic-kl", 1, (short) 1);
}
}

如果Kafka Broker版本在1.0.0或更高,发现现有Topic的Partition数少于设置的Partition数时,会自动新增新的Partition分区。关于KafkaAdmin有几个常用用法:

  • setFatalIfBrokerNotA vailable(true):默认值为false,Broker不可用时不影响Spring上下文初始化。如果认为Broker不可用会影响正常业务,可以显式设为true
  • setAutoCreate(false):默认值为true,Kafka实例化后会自动创建已经实例化的NewTopic对象
  • initialize():当setAutoCreate为false时,需要程序显式调用admin的initialize()方法来初始化NewTopic对象

代码逻辑中创建

有时候程序启动时并不知道某个Topic需要多少Partition数合适,但又不能一股脑用Broker的默认设置。这时就需要用Kafka-Client自带的AdminClient来处理。上面Spring封装的KafkaAdmin底层也是用的AdminClient。例如:

@Autowired
private KafkaProperties properties;
@Test
public void testCreateToipc(){
AdminClient client = AdminClient.create(properties.buildAdminProperties());
if(client !=null){
try {
Collection newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic("topic-kl",1,(short) 1));
client.createTopics(newTopics);
}catch (Throwable e){
e.printStackTrace();
}finally {
client.close();
}
}
}

其他方式创建Topic

以上创建Topic的方式前提是Spring Boot版本在2.x以上,因为Spring-kafka 2.x只支持Spring Boot 2.x。1.x版本中还没有这些API。下面补充一种通过Kafka_2.10创建Topic的方式。

引入依赖


org.apache.kafka
kafka_2.10
0.8.2.2

API方式创建

@Test
public void testCreateTopic()throws Exception{
ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
String topicName = "topic-kl";
int partitions = 1;
int replication = 1;
AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
}

注意ZkClient最后一个构造入参,是一个序列化/反序列化的接口实现。如果填写不当,创建的Topic在ZK上的数据可能会有问题。Kafka默认实现很简单,就是做了字符串UTF-8编码处理。ZKStringSerializer$是Kafka中已经实现好的一个接口实例,是一个Scala伴生对象,在Ja va中直接调用点MODULE$即可得到实例。

命令方式创建

@Test
public void testCreateTopic(){
String [] options= new String[]{"--create","--zookeeper","127.0.0.1:2181","--replication-factor", "3","--partitions", "3","--topic", "topic-kl"};
TopicCommand.main(options);
}

消息发送之KafkaTemplate探秘

获取发送结果

异步获取

template.send("","").addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
......
}
@Override
public void onSuccess(SendResult objectObjectSendResult) {
....
}
});

同步获取

ListenableFuture> future = template.send("topic-kl","kl");
try {
SendResult result = future.get();
}catch (Throwable e){
e.printStackTrace();
}

Kafka事务消息

默认情况下,Spring-kafka自动生成的KafkaTemplate实例不具备事务消息发送能力,需要通过以下配置激活事务特性。事务激活后,所有消息发送只能在发生事务的方法内执行,否则会抛出没有事务交易的异常。

spring.kafka.producer.transaction-id-prefix=kafka_tx.

当发送消息有事务要求时,比如所有消息发送成功才算成功。像下面的例子:假设第一条消息发送后,在发送第二条消息前出现异常,那么第一条已经发送的消息也会回滚。而且正常情况下,假设在消息一发送后休眠一段时间再发第二条,消费端也只有当事务方法执行完成后才能接收到消息。

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.executeInTransaction(t ->{
t.send("topic_input","kl");
if("error".equals(input)){
throw new RuntimeException("failed");
}
t.send("topic_input","ckl");
return true;
});
}

事务特性激活后,在方法上加@Transactional注解也会生效:

@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) {
template.send("topic_input", "kl");
if ("error".equals(input)) {
throw new RuntimeException("failed");
}
template.send("topic_input", "ckl");
}

Spring-Kafka的事务消息基于Kafka提供的事务消息功能。Kafka Broker默认的配置是针对三个或以上Broker高可用服务设置的。测试时为了简单方便,使用嵌入式服务新建了一个单Broker的Kafka服务,会出现一些问题,比如:

  • 事务日志副本集大于Broker数量,会抛出异常:Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. 默认Broker配置transaction.state.log.replication.factor=3,单节点只能调整为1
  • 副本数小于副本同步队列数目,会抛出异常:Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2] 默认Broker配置transaction.state.log.min.isr=2,单节点只能调整为1

ReplyingKafkaTemplate获得消息回复

ReplyingKafkaTemplate是KafkaTemplate的子类,除了继承父类的方法,还新增了一个sendAndReceive方法,实现了消息发送-回复语义。

RequestReplyFuture sendAndReceive(ProducerRecord record);

也就是发送一条消息,能够拿到消费者返回的结果,像传统的RPC交互一样。当消息发送者需要知道消费者的具体消费情况时,这个API非常合适。例如,一条消息发送一批数据,需要知道消费者成功处理了哪些数据。下面代码演示了如何集成和使用ReplyingKafkaTemplate:

@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConcurrentMessageListenerContainer repliesContainer(
ConcurrentKafkaListenerContainerFactory containerFactory) {
ConcurrentMessageListenerContainer repliesContainer =
containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public ReplyingKafkaTemplate replyingTemplate(
ProducerFactory pf,
ConcurrentMessageListenerContainer repliesContainer) {
return new ReplyingKafkaTemplate(pf, repliesContainer);
}
@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory pf) {
return new KafkaTemplate(pf);
}
@Autowired
private ReplyingKafkaTemplate template;
@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) throws Exception {
ProducerRecord record = new ProducerRecord<>("topic-kl", input);
RequestReplyFuture replyFuture = template.sendAndReceive(record);
ConsumerRecord consumerRecord = replyFuture.get();
System.err.println("Return value: " + consumerRecord.value());
}
@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}
}

Spring-kafka消息消费用法探秘

@KafkaListener的使用

前面在简单集成中已经演示过@KafkaListener接收消息的能力。但@KafkaListener的功能不止于此,其他比较常见、使用场景较多的功能点如下:

  • 显示指定消费哪些Topic和分区的消息
  • 设置每个Topic以及分区初始化的偏移量
  • 设置消费线程并发度
  • 设置消息异常处理器

@KafkaListener(id = "webGroup", 
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},
concurrency = "6",
errorHandler = "myErrorHandler")
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}

其他注解参数都比较好理解,errorHandler需要说明一下:设置这个参数需要实现一个KafkaListenerErrorHandler接口,注解中的配置是你自定义实现实例在Spring上下文中的名称。比如,上面配置errorHandler = "myErrorHandler",则在Spring上线文中应存在这样一个实例:

@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
Logger logger =LoggerFactory.getLogger(getClass());
@Override
public Object handleError(Message message, ListenerExecutionFailedException exception) {
logger.info(message.getPayload().toString());
return null;
}
@Override
public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) {
logger.info(message.getPayload().toString());
return null;
}
}

手动Ack模式

手动ACK模式由业务逻辑控制提交偏移量。比如程序在消费时,遇到异常情况不确认ack,也就是不提交偏移量,那只能使用手动Ack模式。开启手动首先需要关闭自动提交,然后设置consumer的消费模式:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

设置完成后,在消费时只需在@KafkaListener监听方法的入参中加入Acknowledgment即可,执行ack.acknowledge()代表提交了偏移量:

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input, Acknowledgment ack) {
logger.info("input value: {}", input);
if ("kl".equals(input)) {
ack.acknowledge();
}
return "successful";
}

@KafkaListener注解监听器生命周期

@KafkaListener注解的监听器生命周期是可控制的。默认情况下,@KafkaListener的参数autoStartup = "true",即自动启动消费。但也可以通过KafkaListenerEndpointRegistry来干预其生命周期。KafkaListenerEndpointRegistry有三个主要方法:start()、pause()、resume(),分别对应启动、暂停、继续。下面代码演示了这种功能:

@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private KafkaTemplate template;
@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) throws Exception {
ProducerRecord record = new ProducerRecord<>("topic-kl", input);
template.send(record);
}
@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/stop/{listenerID}")
public void stop(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).pause();
}
@GetMapping("/resume/{listenerID}")
public void resume(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).resume();
}
@GetMapping("/start/{listenerID}")
public void start(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).start();
}
@KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}
}

在上面的代码中,listenerID就是@KafkaListener中的id值"webGroup"。项目启动后,分别执行以下URL就可以看到效果:

  • 先发送一条消息:https://localhost:8081/send/ckl。因为autoStartup = "false",不会看到有消息进入监听器
  • 接着启动监听器:https://localhost:8081/start/webGroup,可以看到有一条消息进来了
  • 暂停和继续消费的效果可以用类似方法测试

SendTo消息转发

前面的消息发送响应应用中已经见过@SendTo。除了做发送-响应语义外,@SendTo注解还可以带一个参数指定转发的Topic队列。常见场景是,一个消息需要多重加工,不同加工耗费的CPU等资源不一致,就可以通过跨不同Topic和部署在不同主机上的consumer来解决。例如:

@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo("topic-ckl")
public String listen(String input) {
logger.info("input value: {}", input);
return input + "hello!";
}
@KafkaListener(id = "webGroup1", topics = "topic-ckl")
public void listen2(String input) {
logger.info("input value: {}", input);
}

消息重试和死信队列的应用

除了通过手动Ack模式控制消息偏移量,Spring-kafka内部还封装了可重试消费消息的语义。即当消费数据出现异常时,可以重试这条消息,并且可以设置重试达到多少次后,让消息进入预定好的Topic,也就是死信队列。下面代码演示了这种效果:

@Autowired
private KafkaTemplate template;
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
KafkaTemplate template) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//最大重试三次
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.send("topic-kl", input);
}
@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input) {
logger.info("input value: {}", input);
throw new RuntimeException("dlt");
}
@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
public void dltListen(String input) {
logger.info("Received from DLT: " + input);
}

上面应用中,topic-kl监听到消息后,会触发运行时异常,然后监听器会尝试三次调用。当达到最大重试次数后,消息就会被丢到死信队列里面。死信队列的Topic规则是“业务Topic名 + .DLT”。例如业务Topic name为"topic-kl",对应的死信队列Topic就是"topic-kl.DLT"。

文末结语

最近业务上用到Kafka,也就系统性地探索了一下Spring-kafka的各种用法。发现了很多有趣且很酷的特性,比如一个注解开启嵌入式Kafka服务、像RPC调用一样的发送-响应语义调用、事务消息等。希望这篇文章能帮助正在使用或即将使用Spring-kafka的朋友,少走一些弯路,少踩一些坑。

来源:https://developer.aliyun.com/article/704415
上一篇一起剪在线协同视频剪辑平台 下一篇隐藏层是什么?AI神经网络核心概念解析
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
GPT Workspace通过GPT-5强化Google Workspace,文档表格邮件创作效率与智能化提升
AI教程 · 2026-05-29

GPT Workspace通过GPT-5强化Google Workspace,文档表格邮件创作效率与智能化提升

GPT Workspace 产品介绍:GPT-5 如何增强 Google Workspace 工作效率 如果你每天都在使用 Google Workspace 进行文档撰写、表格处理、邮件沟通和演示制作,一定深有体会:大量重复性的办公任务耗费了宝贵的时间。现在,GPT Workspace 将 GPT-

AI助手提升年终总结与周报效率的精准营销策略
AI教程 · 2026-05-29

AI助手提升年终总结与周报效率的精准营销策略

适合需求:在信息爆炸的时代,企业所承受的竞争压力几乎覆盖了所有维度,其中营销领域尤为令人困扰。无论是撰写年终总结还是生成周报,精准的营销策略已成为不可或缺的需求——没有谁愿意在庞杂的数据中迷失方向。当我们复盘营销活动时,总会思考:过去哪些数字营销策略真正发挥了效果?哪些内容营销策略有待改进?然而实际

Afri Studio 非洲创意工作室
AI教程 · 2026-05-29

Afri Studio 非洲创意工作室

Afri Studio是什么先来聊聊Afri Studio——它是Afri AI团队推出的一款AI媒体创作工作室,目标很明确:把原本高高在上的智能技术拉下神坛,让普通用户也能轻松生成高质量的文本、图像、音频等内容。换句话说,这是一个面向内容创作者、博主、营销人员、艺术家的“AI工具箱”,帮你高效搞定

Geniea专注Midjourney提示词优化提升创意生成效率
AI教程 · 2026-05-29

Geniea专注Midjourney提示词优化提升创意生成效率

Geniea产品详解:Midjourney提示优化工具Geniea是一款专注于Midjourney提示词优化的智能平台,致力于帮助创作者快速生成高质量且富有创意的提示方案。无论您需要电影镜头、食品摄影还是汽车广告等场景的提示词,只需输入简单指令,系统便会自动输出优化后的提示文本,大幅提升创作效率。提

幼儿园大班毕业典礼方案PPT AI轻松制作精彩回顾
AI教程 · 2026-05-29

幼儿园大班毕业典礼方案PPT AI轻松制作精彩回顾

使用情景 每年毕业季来临之际,幼儿园大班毕业典礼的筹备工作,总是牵动着众多老师、家长和孩子们的心弦。这不仅仅是一场简单的活动,更是孩子们人生中首个重要的成长仪式,标志着他们告别幼儿时光、迈向新阶段的里程碑。对于家长而言,这也是一次充满感怀的“毕业”,意味着一段陪伴旅程的暂时落幕。 如何让这场典礼既温