在 Spring 应用中整合 Apache Kafka 以生产、消费消息
1、概览
Apache Kafka 是一个分布式且容错的流处理系统。
在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。
Spring Kafka 通过 KafkaTemplate
和使用 @KafkaListener
注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。
2、安装和设置
要下载和安装 Kafka,请参阅 此处 的官方指南。
我们需要在 pom.xml
中添加 spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.0</version>
</dependency>
然后按如下方法配置 spring-boot-maven-plugin
:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
</configuration>
</plugin>
我们的示例应用程序是 Spring Boot。
本文假定服务器使用默认配置启动,并且没有更改服务端口。
3、配置 Topic
之前,我们使用命令行工具在 Kafka 中创建主题:
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic mytopic
但随着 Kafka 引入 AdminClient
,我们现在可以以编程式创建 topic。
我们需要添加 KafkaAdmin
Spring Bean,它将自动为所有 NewTopic
类型的 Bean 添加 topic:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}
4、生产消息
要创建消息,我们首先需要配置 ProducerFactory
。这将设定创建 Kafka Producer 实例的策略。
然后,我们需要一个 KafkaTemplate
,它封装了一个 Producer
实例,并提供向 Kafka topic 发送消息的便捷方法。
Producer
实例是线程安全的。在整个 application context 中使用单例会带来更高的性能。KakfaTemplate
实例也是线程安全的,因此,也建议只维护一个实例。
4.1、Producer 配置
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4.2、推送消息
我们可以使用 KafkaTemplate
类发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
send
API 会返回一个 CompletableFuture
对象。如果我们想阻塞发送线程并获取发送信息的结果,可以调用 CompletableFuture
对象的 get
API。线程将等待结果,但这会减慢生产者的速度。
Kafka 是一个快速流处理平台。因此,最好以异步方式处理结果,这样后续消息就不必等待前一条消息的结果。
我们可以通过回调来实现:
public void sendMessage(String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
System.out.println("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
});
}
5、消费消息
5.1、Consumer 配置
为了消费消息,我们需要配置一个 ConsumerFactory
和一个 KafkaListenerContainerFactory
。一旦 Spring Bean Factory 中的这些 Bean 可用,就可以使用 @KafkaListener
注解配置基于 POJO 的消费者。
配置类上需要使用 @EnableKafka
注解,以便在 Spring 管理的 Bean 上检测 @KafkaListener
注解:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
5.2、消费消息
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
我们可以为一个 topic 实现多个 listener,每个 listener 都有不同的 group Id.。此外,一个 consumer 可以监听来自不同 topic 的消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 还支持在 listener 中使用 @Header 注解检索一个或多个 message header:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
5.3、从指定分区消费消息
注意,我们创建的 topic baeldung
只有一个分区。
对于有多个分区的 topic,@KafkaListener
可以通过初始偏移量(offset)显式地订阅 topic 的特定分区:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
由于该 listener 中的 initialOffset
已设置为 0,因此每次初始化该 listener 时,都会重新消费之前从分区 0 和 3 中消费的所有消息。
如果不需要设置偏移量,我们可以使用 @TopicPartition
注解的 partitions
属性,只设置没有偏移量的分区:
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
5.4、给 Listener 添加 Message Filter
我们可以通过添加自定义 filter 来配置 listener,以消费指定的消息内容。这可以通过向 KafkaListenerContainerFactory
设置 RecordFilterStrategy
来实现:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}
然后,我们就可以在 listener 中配置来使用这个容器工厂:
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
在此 listener 中,所有与 filter 匹配的消息都将被丢弃。
6、自定义 Message Converter
到目前为止,我们只介绍了生产、消费字符串消息。我们也可以发送和接收自定义 Java 对象。这需要在 ProducerFactory
中配置适当的序列化器,并在 ConsumerFactory
中配置反序列化器。
让我们来看看一个简单的 bean 类, 我们将把它作为信息发送:
public class Greeting {
private String msg;
private String name;
// get/set 构造函数省略
}
6.1、生产自定义消息
在本例中,我们将使用 JsonSerializer。
让我们看看 ProducerFactory
和 KafkaTemplate
的代码:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
我们可以使用这个新的 KafkaTemplate
发送 Greeting
信息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2、消费自定义消息
同样,让我们修改 ConsumerFactory
和 KafkaListenerContainerFactory
,以正确反序列化 Greeting
信息:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
spring-kafka JSON 序列化器和反序列化器使用的是 Jackson 库,它也是 spring-kafka
项目的可选 Maven 依赖项。
把它添加到 pom.xml
中:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
建议使用 spring-kafka
所兼容的版本(定义在 pom.xml
中),而不是使用最新版本的 Jackson。
最后,我们需要编写一个 listener 来接收 Greeting
信息:
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// 处理消息
}
7、多种消费方式的 Listener
现在让我们看看如何配置应用程序,将各种对象发送到同一个 topic,然后再消费它们。
首先,我们将添加一个新类 Farewell
:
public class Farewell {
private String message;
private Integer remainingMinutes;
// 省略构造、get、set 方法
}
我们需要进行一些额外的配置,才能向同一个 topic 发送 Greeting
和 Farewell
对象。
7.1、在 Producer 中设置映射类型
在 producer 中,我们必须配置 JSON 类型映射:
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
这样,库就会在 type header 中填写相应的类名。
因此,ProducerFactory
和 KafkaTemplate
看起来就像这样:
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
我们可以使用该 KafkaTemplate
向 topic 发送 Greeting
、Farewell
或任何对象:
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");
7.2、在 Consumer 中自定义 MessageConverter
为了能够反序列化传入的消息,我们需要为 Consumer
提供一个自定义的消息转换器(MessageConverter
)。
MessageConverter
依赖于一个 Jackson2JavaTypeMapper
。默认情况下,映射器会推断出接收对象的类型:相反,我们需要明确告诉它使用 type header 来确定反序列化的目标类:
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
我们还需要提供反向映射信息。在 type header 中找到 “greeting” 将被识别为一个 Greeting
对象,而 “farewell” 对应于一个 Farewell
对象。
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
最后,我们需要配置 typeMapper 信任的包。我们必须确保它包含目标类的位置:
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
下面是该 MessageConverter
的最终定义:
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
现在,我们需要告诉 ConcurrentKafkaListenerContainerFactory
使用 MessageConverter
和一个相当基本的 ConsumerFactory
:
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
return factory;
}
7.3、在 Listener 中使用 @KafkaHandler
最后但同样重要的是,在我们的 KafkaListener
中,我们要创建 handler method 来消费每个类型的对象。每个处理方法都需要用 @KafkaHandler
进行注解。
最后要注意的是,我们还可以为无法绑定到 Greeting
或 Farewell
类的对象定义一个默认的 handler::
@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {
@KafkaHandler
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
@KafkaHandler
public void handleF(Farewell farewell) {
System.out.println("Farewell received: " + farewell);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Unkown type received: " + object);
}
}
8、总结
在本文中,我们介绍了 Spring 支持 Apache Kafka 的基础知识。我们简要介绍了用于生产和消费消息的类。
在运行代码之前,请确保 Kafka 服务已经运行,并且已经手动创建了 Topic。
参考:https://www.baeldung.com/spring-kafka