在 Spring Boot 中动态管理 Kafka Listener
1、概览
本文将带你了解如何在 Spring Boot 应用中动态地启动和停止 Kafka Listener。
2、依赖
首先,添加 spring-kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
3、配置 Kafka 消费者
生产者是向 Kafka Topic 发布(写入)事件的应用。
在本教程中,我们使用单元测试来模拟生产者向 Kafka Topic 发送事件。订阅 Topic 并处理事件流的消费者由应用中的 Listener
(监听器)表示。该 Listener 被配置为处理来自 Kafka 的传入消息。
通过 KafkaConsumerConfig
类来配置 Kafka 消费者,其中包括 Kafka broker 的地址、消费者组 ID 以及 Key 和 Value 的反序列化器:
@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class));
}
4、配置 Kafka Listener
在 Spring Kafka 中,用 @KafkaListener
对方法进行注解,就能创建一个 Listener,用于接收来自指定 Topic 的消息。
声明一个 UserEventListener
类:
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
logger.info("Received UserEvent: " + userEvent.getUserEventId());
userEventStore.addUserEvent(userEvent);
}
上述 Listener 等待来自 MULTI_PARTITION_TOPIC
的消息,并使用 processUserEvent()
方法处理这些消息。我们将 groupId
指定为 test-group
,让消费者成为消费组的实例之一,从而促进跨多个实例的分布式处理。
使用 id
属性为每个 Listener 分配一个唯一标识符。在本例中,分配的 Listener ID 是 listener-id-1
。
通过 autoStartup
属性,可以控制 Listener 是否在应用初始化时启动。在本例中,我们将其设置为 false
,这意味着当应用启动时,Listener 不会自动启动。这种配置为我们提供了手动启动 Listener 的灵活性。
这种手动启动可由各种事件触发,如新用户注册、应用内的特定条件(如达到一定的数据量阈值)或管理操作(如通过管理界面手动启动 Listener)。例如,如果在线零售应用检测到秒杀抢购期间流量激增,它就会自动启动额外的 Listener 来处理增加的负载,从而优化性能。
UserEventStore
用于临时存储 Listener 接收到的事件:
@Component
public class UserEventStore {
private final List<UserEvent> userEvents = new ArrayList<>();
public void addUserEvent(UserEvent userEvent) {
userEvents.add(userEvent);
}
public List<UserEvent> getUserEvents() {
return userEvents;
}
public void clearUserEvents() {
this.userEvents.clear();
}
}
5、动态控制 Listener
创建一个 KafkaListenerControlService
,使用 KafkaListenerEndpointRegistry
动态地启动和停止 Kafka Listener:
@Service
public class KafkaListenerControlService {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) {
listenerContainer.start();
}
}
public void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
}
KafkaListenerControlService
可以根据分配的 ID 精确地管理单个 Listener 实例。startListener()
和 stopListener()
方法都使用 listenerId
作为参数,允许我们根据需要启动和停止 Topic 的消息消费。
KafkaListenerEndpointRegistry
是在 Spring Application Context 中定义的所有 Kafka Listener 端点的中央 Repository。它监控这些 Listener 容器,从而允许对它们的状态(启动、停止或暂停)进行编程式控制。对于需要实时调整消息处理活动而无需重启整个应用的应序来说,这一功能至关重要。
6、验证动态控制
接下来,在 Spring Boot 应用中测试 Kafka Listener 的动态启动和停止功能。
首先,启动 Listener:
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
然后,通过发送并处理一个测试事件来验证 Listener 是否已激活:
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
this.userEventStore.clearUserEvents();
现在 Listener 已激活,我们批量发送十条消息进行处理。发送四条信息后,停止 Listener,然后将剩余的信息发送到 Kafka Topic:
for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
RecordMetadata metadata = future.get();
if (count == 4) {
await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
this.userEventStore.clearUserEvents();
}
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
在启动 Listener 之前,要确认 event store 中没有任何消息:
assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
一旦 Listener 再次启动,它就会处理我们在 Listener 停止后发送到 Kafka Topic 的其余六条消息。
这个测试展示了 Spring Boot 应用动态管理 Kafka Listener 的能力。
7、使用案例
动态 Listener 管理在需要高度适应性的场景中表现出色。例如,在负载高峰期,可以动态地启动额外的 Listener,以提高吞吐量并减少处理时间。相反,在维护或低流量期间,可以停止 Listener 以节省资源。这种灵活性还有利于在 “Feature flags” 后部署新功能,从而在不影响整个系统的情况下进行无缝的即时调整。
考虑这样一个场景:一个电子商务平台推出了一个新的推荐引擎,旨在通过根据浏览历史和购买模式推荐产品来提升用户体验。为了在全面推出之前验证该功能的有效性,我们决定在 “Feature flags” 后部署该功能。
激活此 “Feature flag” 可启动 Kafka Listener。当终端用户与平台互动时,由 Kafka Listener 提供支持的推荐引擎会处理传入的用户活动数据流,生成个性化的产品推荐。
当停用 “Feature flag” 时,就会停止 Kafka Listener,平台会默认使用现有的推荐引擎。这样,无论新引擎处于哪个测试阶段,都能确保无缝的用户体验。
当该功能处于激活状态时,我们可以积极收集数据,监控性能指标,并对推荐引擎进行调整。我们可以在多次迭代中重复这种功能测试,直到达到预期效果。
通过这种迭代过程,动态 Listener 管理被证明是一种有价值的工具。它允许无缝引入新功能
8、总结
本文介绍了 Kafka 与 Spring Boot 的整合,重点是动态管理 Kafka Listener。这一功能对于管理 波动性工作负载 和执行日常维护至关重要。此外,它还能实现功能切换,根据流量模式扩展服务,并通过特定触发器管理事件驱动的工作流。
Ref:https://www.baeldung.com/kafka-spring-boot-dynamically-manage-listeners