在 Spring 应用中实现 Kafka Consumer 重试消费

1、概览

本文将会带你学习在 Spring 应用中实现 Kafka Consumer 重试消费的 2 种方式,及其优缺点。

关于如何在 Spring 中整合 Kafka 的细节,请参阅 这里

2、项目设置

创建一个新的 Spring Boot 项目,并添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.1</version>
</dependency>

创建一个对象:

public class Greeting {

    private String msg;
    private String name;

    // 构造函数、get、set 方法省略
}

3、Kafka Consumer

Kafka Consumer(消费者)是从 Kafka 集群中读取数据的客户端应用程序。它订阅一个或多个 topic,并消费已发布的消息。Producer (生产者)向 topic 发送消息,topic 是存储和发布记录的类别名称。topic 被分为多个分区,以便横向扩展。每个分区都是一个不可更改的消息序列。

Consumer 可以通过指定偏移量(即消息在分区中的位置)来读取特定分区中的消息。Ack(确认)是消费者发送给 Kafka broker 的消息,表示它已成功处理了一条记录。一旦 ACK 被发送,消费者偏移量(consumer offset)将会被更新。

这将确保消息已被消费,并且不会再次传递给当前 Listener。

3.1、Ack 模式

Ack 模式决定了 broker 何时更新消费者偏移量(consumer offset)。

有三种 Ack 模式:

  1. auto-commit(自动提交):消费者在收到信息后立即向 broker 发送 Ack 信息。
  2. after-processing(处理后):消费者只有在成功消费消息后才向 broker 发送 Ack 信息。
  3. manual(手动):消费者在收到具体指令后才向 broker 发送 Ack 信息。

Ack 模式决定了消费者如何处理从 Kafka 集群读取的消息。

让我们创建一个新的 Bean,ConcurrentKafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

我们可以配置几种 Ack 模式:

  1. AckMode.RECORD:after-processing(处理后)模式,消费者会为其消费的每条信息发送 Ack 消息。
  2. AckMode.BATCH:手动模式,消费者为一批消息发送确认,而不是为每条消息发送 Ack。
  3. AckMode.COUNT:手动模式,消费者会在消费完一定数量的消息后发送 Ack 消息。
  4. AckMode.MANUAL:手动模式,消费者不会对其消费的报文发送 Ack 消息。
  5. AckMode.TIME:手动模式,消费者会在一定时间后发送 Ack 消息。

要在 Kafka 中实现消息处理的重试逻辑,我们需要选择一种 AckMode

这种 AckMode 应能让消费者向 broker 指出哪些特定消息已被成功消费。

这样,broker 就可以将任何未确认的消息重新发送给另一个消费者。

在阻塞式重试的情况下,这可能是 RECORDMANUAL(手动)模式。

4、阻塞式重试

如果初次尝试因临时错误而失败,阻塞式重试可让消费者再次尝试消费信息。

消费者等待一定时间(称为重试延迟周期 - retry backoff period)后,才会再次尝试消费信息。

此外,用户还可以使用固定延迟(fixed delay)或指数级回退策略(exponential backoff strategy)自定义重试延迟周期。

它还可以设置在放弃并将消息标记为失败之前的最大重试次数。

4.1、错误处理

让我们在 Kafka 配置类上定义两个属性:

@Value(value = "${kafka.backoff.interval}")
private Long interval;

@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;

为了处理消费过程中抛出的所有异常,让我们定义一个新的 error handler:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        // 当所有重试尝试都用尽时执行的逻辑
    }, fixedBackOff);
    return errorHandler;
}

FixedBackOff 类需要两个参数:

  • interval:两次重试之间的等待时间(毫秒)。
  • maxAttempts:在放弃之前重新尝试操作的最大次数。

在这种策略中,消费者在重试消息消费之前会等待一段固定的时间。

DefaultErrorHandler 使用一个 lambda 函数进行初始化,该函数代表了当所有重试尝试都耗尽时要执行的逻辑。

lambda 函数有两个参数:

  • consumerRecord:表示导致错误的 Kafka 记录。
  • exception:表示抛出的异常。

4.2、Container Factory

让我们在 container factory bean 上添加 error handler:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.setCommonErrorHandler(errorHandler());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

如果存在重试策略,请将确认模式(ack mode)设置为 AckMode.RECORD,以确保在处理过程中发生错误时,消费者将重新投递消息。

我们不应将确认模式(ack mode)设置为 AckMode.BATCHAckMode.TIME,因为消费者将一次性确认多个消息。

这是因为如果在处理消息时发生错误,消费者不会重新投递批处理或时间窗口中的所有消息给自己。

因此重试策略无法正确处理错误。

4.3、可重试异常和不可重试异常

我们可以指定哪些异常可重试,哪些不可重试。

修改 ErrorHandler

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // 当所有重试尝试都用尽时执行的逻辑
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

如上,我们在消费者中指定了哪些异常类型应触发重试策略。

SocketTimeoutException 被认为是可重试的,而 NullPointerException 被认为是不可重试的。

如果我们没有定义、设置任何可重试异常,则将使用默认的可重试异常集:

4.4、优缺点

在阻塞式重试中,当消息处理失败时,消费者会阻塞,直到重试机制完成重试或达到最大重试次数。

使用阻塞式重试有几个优点和缺点。

通过允许消费者在发生错误时重试消息的消费,阻塞式重试可以提高消息处理流水线的可靠性。即使发生短暂错误,这可以确保消息被成功处理。

阻塞式重试可以通过抽象出重试机制来简化消息处理逻辑的实现。消费者可以专注于处理消息,让重试机制处理任何可能发生的错误。

最后,如果消费者需要等待重试机制完成重试,阻塞式重试可能会在消息处理流水线中引入延迟。这可能会影响系统的整体性能。阻塞式重试还可能导致消费者消耗更多的资源,如CPU和内存,因为它需要等待重试机制完成重试。这可能会影响系统的整体可扩展性。

5、非阻塞式重试

非阻塞式重试允许消费者在异步的情况下重试消息的消费,而不会阻塞消息监听方法的执行。

5.1、@RetryableTopic

让我们在 KafkaListener 上添加注解 @RetryableTopic

@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {

    @KafkaHandler
    @RetryableTopic(
      backoff = @Backoff(value = 3000L), 
      attempts = "5", 
      autoCreateTopics = "false",
      include = SocketTimeoutException.class, exclude = NullPointerException.class)
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }
}

我们通过修改多个属性定制了重试行为,例如

  • backoff:该属性指定在重试失败的消息时要使用的退避策略。
  • attempts:该属性指定在放弃之前消息应该重试的最大次数。
  • autoCreateTopics:如果 retry topic 和 DLT(死信 Topic)不存在,该属性指定是否自动创建它们。
  • include:指定需要触发重试的异常。
  • exclude:指定不需要触发重试的异常。

当信息无法投递到预定 topic 时,它会自动发送到 retry topic 进行重试。

如果在最大尝试次数后,信息仍无法送达,它将被发送到 DLT (死信 topic)进行进一步处理。

5.2、优缺点

实现非阻塞式重试有几个好处:

  • 提高性能:非阻塞式重试允许在不阻塞调用线程的情况下重试失败的消息,这可以提高应用程序的整体性能。
  • 提高可靠性:非阻塞式重试可以帮助应用程序从故障中恢复并继续处理消息,即使某些消息未能成功传递。

不过,在实现非阻塞式重试时,也要考虑一些潜在的缺点:

  • 复杂性增加: 非阻塞式重试会增加应用程序的复杂性,因为我们需要处理重试逻辑和 DLT(死信 Topic)。
  • 消息重复消费的风险:如果消息在重试后成功送达,那么消费者可能会重复消费同一消息。我们需要考虑这一风险,并采取措施防止重复消费。
  • 消息的顺序:重试的消息以异步方式发送到 retry topic,可能比未重试的消息更晚发送到原始 topic。

6、总结

在本教程中,我们学习了如何在 Kafka logic 上实现重试逻辑,包括阻塞式和非阻塞式的方法。


参考:https://www.baeldung.com/spring-retry-kafka-consumer