先说结论:Spring Kafka 完全能够实现消息过滤功能,而且提供了极为灵活的实现方案。
在实际项目开发中,Kafka 消息的消费并非总是全盘接收,很多时候需要在消费端加入一层“筛选”机制——比如只处理特定格式的 payload,或者根据消息头进行路由分发。Spring Kafka 提供了足够多的扩展点来轻松完成这一需求,无需额外引入复杂的中间件组件。

直接上代码,按步骤操作:
第一步,自定义错误处理器。
虽然消息过滤本身并不强制要求配置错误处理器,但在生产环境中,消息反序列化失败、数据格式异常等情况时有发生,预置一个兜底处理比较稳妥。实现 ConsumerAwareErrorHandler 接口,将异常处理逻辑写入其中。
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.springframework.kafka.listener.Message;
public class CustomErrorHandler implements ConsumerAwareErrorHandler {
@Override
public void handle(Exception thrownException, Message message, ConsumerRecord, ?> data) {
// 在这里编写你的错误处理逻辑
}
}
第二步,核心:编写消息监听器,并在 onMessage 方法中完成过滤。
这是最直观的方式——获取消息后,通过 shouldFilter 方法判断是否需要进行处理。判断条件可以基于 payload 内容、key、消息头等。以下示例仅处理 payload 中包含 "filtered" 的消息,其他消息直接忽略。
import org.springframework.kafka.listener.MessageListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
String payload = new String(message.getPayload());
String key = message.getKey();
if (shouldFilter(payload)) {
// 处理过滤后的消息
} else {
// 忽略过滤后的消息
}
}
private boolean shouldFilter(String payload) {
return payload.contains("filtered");
}
}
第三步,配置容器工厂和监听端点。
这一步将上面的监听器注册到 Spring Kafka 的容器中。需要自定义 KafkaListenerContainerFactory,并实现 KafkaListenerConfigurer 来注册端点。此处使用 MethodKafkaListenerEndpoint 指定要监听的主题和对应方法。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import ja va.util.HashMap;
import ja va.util.Map;
@Configuration
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
Map props = new HashMap<>();
// 配置你的消费者属性,如 groupId、bootstrapServers 等
// ...
registrar.registerEndpoint(new MethodKafkaListenerEndpoint<>(
"custom-topic",
"customMethod",
getClass().getClassLoader(),
String.class,
String.class,
props));
}
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry(KafkaListenerEndpointRegistrar registrar) {
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
registrar.afterPropertiesSet();
registry.start();
return registry;
}
}
第四步,通过 @KafkaListener 注解将监听器绑定到主题。
这种方式更加简洁——直接在 CustomMessageListener 的 onMessage 方法上添加注解,指定 topic 和 groupId。Spring 会自动完成注册和绑定。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageListener implements MessageListener {
@Override
@KafkaListener(topics = "custom-topic", groupId = "custom-group")
public void onMessage(Message message) {
// 在这里编写你的消息过滤和处理逻辑
}
}
现在,每当有消息发送到 custom-topic 主题时,CustomMessageListener 会依据 shouldFilter 的逻辑进行判断——只有符合条件的消息才会被真正处理,其余消息则直接跳过。
当然,上述方案仅是最基础的实现方式。在实际项目中,还可以结合 RecordFilterStrategy(Spring Kafka 2.x 及以后版本支持)、自定义 ConsumerInterceptor 在拉取消息时进行过滤,或者利用 Acknowledgment 实现更精细的确认控制。不过对于大多数场景,以上代码已经足够满足需求。
