在 Spring 应用中整合 Apache Kafka 以生产、消费消息

1、概览

Apache Kafka 是一个分布式且容错的流处理系统。

在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。

Spring Kafka 通过 KafkaTemplate 和使用 @KafkaListener 注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。

2、安装和设置

要下载和安装 Kafka,请参阅 此处 的官方指南。

我们需要在 pom.xml 中添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.0</version>
</dependency>

然后按如下方法配置 spring-boot-maven-plugin

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
        <mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
    </configuration>
</plugin>

我们的示例应用程序是 Spring Boot。

本文假定服务器使用默认配置启动,并且没有更改服务端口。

3、配置 Topic

之前,我们使用命令行工具在 Kafka 中创建主题:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

但随着 Kafka 引入 AdminClient,我们现在可以以编程式创建 topic。

我们需要添加 KafkaAdmin Spring Bean,它将自动为所有 NewTopic 类型的 Bean 添加 topic:

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("baeldung", 1, (short) 1);
    }
}

4、生产消息

要创建消息,我们首先需要配置 ProducerFactory。这将设定创建 Kafka Producer 实例的策略。

然后,我们需要一个 KafkaTemplate,它封装了一个 Producer 实例,并提供向 Kafka topic 发送消息的便捷方法。

Producer 实例是线程安全的。在整个 application context 中使用单例会带来更高的性能。KakfaTemplate 实例也是线程安全的,因此,也建议只维护一个实例。

4.1、Producer 配置

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2、推送消息

我们可以使用 KafkaTemplate 类发送消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

send API 会返回一个 CompletableFuture 对象。如果我们想阻塞发送线程并获取发送信息的结果,可以调用 CompletableFuture 对象的 get API。线程将等待结果,但这会减慢生产者的速度。

Kafka 是一个快速流处理平台。因此,最好以异步方式处理结果,这样后续消息就不必等待前一条消息的结果。

我们可以通过回调来实现:

public void sendMessage(String message) {
     CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
     future.whenComplete((result, ex) -> {
         if (ex == null) {
             System.out.println("Sent message=[" + message + 
                 "] with offset=[" + result.getRecordMetadata().offset() + "]");
         } else {
             System.out.println("Unable to send message=[" + 
                 message + "] due to : " + ex.getMessage());
         }
     });
}

5、消费消息

5.1、Consumer 配置

为了消费消息,我们需要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory。一旦 Spring Bean Factory 中的这些 Bean 可用,就可以使用 @KafkaListener 注解配置基于 POJO 的消费者。

配置类上需要使用 @EnableKafka 注解,以便在 Spring 管理的 Bean 上检测 @KafkaListener 注解:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2、消费消息

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

我们可以为一个 topic 实现多个 listener,每个 listener 都有不同的 group Id.。此外,一个 consumer 可以监听来自不同 topic 的消息:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring 还支持在 listener 中使用 @Header 注解检索一个或多个 message header:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3、从指定分区消费消息

注意,我们创建的 topic baeldung 只有一个分区。

对于有多个分区的 topic,@KafkaListener 可以通过初始偏移量(offset)显式地订阅 topic 的特定分区:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

由于该 listener 中的 initialOffset 已设置为 0,因此每次初始化该 listener 时,都会重新消费之前从分区 0 和 3 中消费的所有消息。

如果不需要设置偏移量,我们可以使用 @TopicPartition 注解的 partitions 属性,只设置没有偏移量的分区:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4、给 Listener 添加 Message Filter

我们可以通过添加自定义 filter 来配置 listener,以消费指定的消息内容。这可以通过向 KafkaListenerContainerFactory 设置 RecordFilterStrategy 来实现:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

然后,我们就可以在 listener 中配置来使用这个容器工厂:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

在此 listener 中,所有与 filter 匹配的消息都将被丢弃。

6、自定义 Message Converter

到目前为止,我们只介绍了生产、消费字符串消息。我们也可以发送和接收自定义 Java 对象。这需要在 ProducerFactory 中配置适当的序列化器,并在 ConsumerFactory 中配置反序列化器。

让我们来看看一个简单的 bean 类, 我们将把它作为信息发送:

public class Greeting {

    private String msg;
    private String name;

    // get/set 构造函数省略
}

6.1、生产自定义消息

在本例中,我们将使用 JsonSerializer

让我们看看 ProducerFactoryKafkaTemplate 的代码:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

我们可以使用这个新的 KafkaTemplate 发送 Greeting 信息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2、消费自定义消息

同样,让我们修改 ConsumerFactoryKafkaListenerContainerFactory,以正确反序列化 Greeting 信息:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSON 序列化器和反序列化器使用的是 Jackson 库,它也是 spring-kafka 项目的可选 Maven 依赖项。

把它添加到 pom.xml 中:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

建议使用 spring-kafka 所兼容的版本(定义在 pom.xml 中),而不是使用最新版本的 Jackson。

最后,我们需要编写一个 listener 来接收 Greeting 信息:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // 处理消息
}

7、多种消费方式的 Listener

现在让我们看看如何配置应用程序,将各种对象发送到同一个 topic,然后再消费它们。

首先,我们将添加一个新类 Farewell

public class Farewell {

    private String message;
    private Integer remainingMinutes;

    // 省略构造、get、set 方法
}

我们需要进行一些额外的配置,才能向同一个 topic 发送 GreetingFarewell 对象。

7.1、在 Producer 中设置映射类型

在 producer 中,我们必须配置 JSON 类型映射:

configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");

这样,库就会在 type header 中填写相应的类名。

因此,ProducerFactoryKafkaTemplate 看起来就像这样:

@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.TYPE_MAPPINGS, 
      "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
    return new KafkaTemplate<>(multiTypeProducerFactory());
}

我们可以使用该 KafkaTemplate 向 topic 发送 GreetingFarewell 或任何对象:

multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");

7.2、在 Consumer 中自定义 MessageConverter

为了能够反序列化传入的消息,我们需要为 Consumer 提供一个自定义的消息转换器(MessageConverter)。

MessageConverter 依赖于一个 Jackson2JavaTypeMapper。默认情况下,映射器会推断出接收对象的类型:相反,我们需要明确告诉它使用 type header 来确定反序列化的目标类:

typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);

我们还需要提供反向映射信息。在 type header 中找到 “greeting” 将被识别为一个 Greeting 对象,而 “farewell” 对应于一个 Farewell 对象。

Map<String, Class<?>> mappings = new HashMap<>(); 
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);

最后,我们需要配置 typeMapper 信任的包。我们必须确保它包含目标类的位置:

typeMapper.addTrustedPackages("com.baeldung.spring.kafka");

下面是该 MessageConverter 的最终定义:

@Bean
public RecordMessageConverter multiTypeConverter() {
    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
    Map<String, Class<?>> mappings = new HashMap<>();
    mappings.put("greeting", Greeting.class);
    mappings.put("farewell", Farewell.class);
    typeMapper.setIdClassMapping(mappings);
    converter.setTypeMapper(typeMapper);
    return converter;
}

现在,我们需要告诉 ConcurrentKafkaListenerContainerFactory 使用 MessageConverter 和一个相当基本的 ConsumerFactory

@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
    HashMap<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(multiTypeConsumerFactory());
    factory.setMessageConverter(multiTypeConverter());
    return factory;
}

7.3、在 Listener 中使用 @KafkaHandler

最后但同样重要的是,在我们的 KafkaListener 中,我们要创建 handler method 来消费每个类型的对象。每个处理方法都需要用 @KafkaHandler 进行注解。

最后要注意的是,我们还可以为无法绑定到 GreetingFarewell 类的对象定义一个默认的 handler::

@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {

    @KafkaHandler
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }

    @KafkaHandler
    public void handleF(Farewell farewell) {
        System.out.println("Farewell received: " + farewell);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Unkown type received: " + object);
    }
}

8、总结

在本文中,我们介绍了 Spring 支持 Apache Kafka 的基础知识。我们简要介绍了用于生产和消费消息的类。

在运行代码之前,请确保 Kafka 服务已经运行,并且已经手动创建了 Topic。


参考:https://www.baeldung.com/spring-kafka