在Spring Kafka中配置分区数时,通常不建议在消费者端硬编码,而是在创建 KafkaListenerContainerFactory 阶段进行统一设置。下面的示例展示了如何在Java配置类中通过 ConsumerConfig.PARTITION_COUNT 属性来明确指定分区数量,从而实现对Kafka分区数的灵活控制。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistry;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import ja va.util.HashMap;
import ja va.util.Map;
@Configuration
public class KafkaConfig implements KafkaListenerConfigurer {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 设置并发消费者数量
return factory;
}
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.PARTITION_COUNT, 5); // 设置分区数
return new DefaultKafkaConsumerFactory<>(props);
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
MethodKafkaListenerEndpointRegistry registry = new MethodKafkaListenerEndpointRegistry();
registrar.setEndpoints(registry.getEndpoints());
}
}
在上述示例中,我们通过 PARTITION_COUNT 属性将分区数设置为5。需要特别注意的是,配置的分区数最好与Kafka主题实际拥有的分区数量保持一致,这样才能确保消息的均匀分布与消费的正确性。如果不确定当前主题的具体分区数,可以使用Kafka自带的命令行工具(如 kafka-topics.sh)或通过Kafka管理界面进行查询,从而获得准确的分区信息。
