在 Spring Boot 中动态管理 Kafka Listener

1、概览

本文将带你了解如何在 Spring Boot 应用中动态地启动和停止 Kafka Listener。

2、依赖

首先,添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

3、配置 Kafka 消费者

生产者是向 Kafka Topic 发布(写入)事件的应用。

在本教程中,我们使用单元测试来模拟生产者向 Kafka Topic 发送事件。订阅 Topic 并处理事件流的消费者由应用中的 Listener(监听器)表示。该 Listener 被配置为处理来自 Kafka 的传入消息。

通过 KafkaConsumerConfig 类来配置 Kafka 消费者,其中包括 Kafka broker 的地址、消费者组 ID 以及 Key 和 Value 的反序列化器:

@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class));
}

4、配置 Kafka Listener

在 Spring Kafka 中,用 @KafkaListener 对方法进行注解,就能创建一个 Listener,用于接收来自指定 Topic 的消息。

声明一个 UserEventListener 类:

@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
  containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
    logger.info("Received UserEvent: " + userEvent.getUserEventId());
    userEventStore.addUserEvent(userEvent);
}

上述 Listener 等待来自 MULTI_PARTITION_TOPIC 的消息,并使用 processUserEvent() 方法处理这些消息。我们将 groupId 指定为 test-group,让消费者成为消费组的实例之一,从而促进跨多个实例的分布式处理。

使用 id 属性为每个 Listener 分配一个唯一标识符。在本例中,分配的 Listener ID 是 listener-id-1

通过 autoStartup 属性,可以控制 Listener 是否在应用初始化时启动。在本例中,我们将其设置为 false,这意味着当应用启动时,Listener 不会自动启动。这种配置为我们提供了手动启动 Listener 的灵活性。

这种手动启动可由各种事件触发,如新用户注册、应用内的特定条件(如达到一定的数据量阈值)或管理操作(如通过管理界面手动启动 Listener)。例如,如果在线零售应用检测到秒杀抢购期间流量激增,它就会自动启动额外的 Listener 来处理增加的负载,从而优化性能。

UserEventStore 用于临时存储 Listener 接收到的事件:

@Component
public class UserEventStore {

    private final List<UserEvent> userEvents = new ArrayList<>();

    public void addUserEvent(UserEvent userEvent) {
        userEvents.add(userEvent);
    }

    public List<UserEvent> getUserEvents() {
        return userEvents;
    }

    public void clearUserEvents() {
        this.userEvents.clear();
    }
}

5、动态控制 Listener

创建一个 KafkaListenerControlService,使用 KafkaListenerEndpointRegistry 动态地启动和停止 Kafka Listener:

@Service
public class KafkaListenerControlService {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    public void startListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && !listenerContainer.isRunning()) {
            listenerContainer.start();
        }
    }

    public void stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && listenerContainer.isRunning()) {
            listenerContainer.stop();
        }
    }
}

KafkaListenerControlService 可以根据分配的 ID 精确地管理单个 Listener 实例。startListener()stopListener() 方法都使用 listenerId 作为参数,允许我们根据需要启动和停止 Topic 的消息消费。

KafkaListenerEndpointRegistry 是在 Spring Application Context 中定义的所有 Kafka Listener 端点的中央 Repository。它监控这些 Listener 容器,从而允许对它们的状态(启动、停止或暂停)进行编程式控制。对于需要实时调整消息处理活动而无需重启整个应用的应序来说,这一功能至关重要。

6、验证动态控制

接下来,在 Spring Boot 应用中测试 Kafka Listener 的动态启动和停止功能。

首先,启动 Listener:

kafkaListenerControlService.startListener(Constants.LISTENER_ID);

然后,通过发送并处理一个测试事件来验证 Listener 是否已激活:

UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); 
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); 
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); 
this.userEventStore.clearUserEvents();

现在 Listener 已激活,我们批量发送十条消息进行处理。发送四条信息后,停止 Listener,然后将剩余的信息发送到 Kafka Topic:

for (long count = 1; count <= 10; count++) {
    UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
    Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
    RecordMetadata metadata = future.get();
    if (count == 4) {
        await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
        this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
        this.userEventStore.clearUserEvents();
    }
    logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}

在启动 Listener 之前,要确认 event store 中没有任何消息:

assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);

一旦 Listener 再次启动,它就会处理我们在 Listener 停止后发送到 Kafka Topic 的其余六条消息。

这个测试展示了 Spring Boot 应用动态管理 Kafka Listener 的能力。

7、使用案例

动态 Listener 管理在需要高度适应性的场景中表现出色。例如,在负载高峰期,可以动态地启动额外的 Listener,以提高吞吐量并减少处理时间。相反,在维护或低流量期间,可以停止 Listener 以节省资源。这种灵活性还有利于在 “Feature flags” 后部署新功能,从而在不影响整个系统的情况下进行无缝的即时调整。

考虑这样一个场景:一个电子商务平台推出了一个新的推荐引擎,旨在通过根据浏览历史和购买模式推荐产品来提升用户体验。为了在全面推出之前验证该功能的有效性,我们决定在 “Feature flags” 后部署该功能。

激活此 “Feature flag” 可启动 Kafka Listener。当终端用户与平台互动时,由 Kafka Listener 提供支持的推荐引擎会处理传入的用户活动数据流,生成个性化的产品推荐。

当停用 “Feature flag” 时,就会停止 Kafka Listener,平台会默认使用现有的推荐引擎。这样,无论新引擎处于哪个测试阶段,都能确保无缝的用户体验。

当该功能处于激活状态时,我们可以积极收集数据,监控性能指标,并对推荐引擎进行调整。我们可以在多次迭代中重复这种功能测试,直到达到预期效果。

通过这种迭代过程,动态 Listener 管理被证明是一种有价值的工具。它允许无缝引入新功能

8、总结

本文介绍了 Kafka 与 Spring Boot 的整合,重点是动态管理 Kafka Listener。这一功能对于管理 波动性工作负载 和执行日常维护至关重要。此外,它还能实现功能切换,根据流量模式扩展服务,并通过特定触发器管理事件驱动的工作流。


Ref:https://www.baeldung.com/kafka-spring-boot-dynamically-manage-listeners