管理 Kafka 消费者组

1、简介

消费者组允许多个消费者从同一 Topic 消费数据,有助于创建更具可扩展性的 Kafka 应用。

本文将带你了解消费者组以及它们如何在其消费者之间重新平衡分区(Rebalance Partition)。

2、消费者组是什么?

消费者组是一组与一个或多个 Topic 相关联的唯一消费者。每个消费者可以从 0 个、1 个或多个分区中消费数据。此外,每个分区在给定时间内只能分配给一个消费者。分区分配会随着群成员的变化而变化。这就是所谓的组重新平衡(Group Rebalancing)。

消费者分组是 Kafka 应用的重要组成部分。它允许将类似的消费者分组,使他们可以并行地从一个分区 Topic 中消费数据。因此,它提高了 Kafka 应用的性能和可扩展性。

2.1、Group Coordinator 和 Group Leader

当实例化消费者组时,Kafka 也会创建 Group Coordinator(组协调器)。Group Coordinator 会定期接收来自消费者的请求,这些请求被称为 “心跳”。如果某个消费者停止发送心跳,Coordinator 就会认为该消费者要么已经离开了组,要么已经崩溃。这就是分区重新平衡(Partition Rebalance)的一个可能触发因素。

第一个向 Group Coordinator 提出加入 Group 请求的消费者成为 Group Leader。当因任何原因发生重新平衡(Rebalance)时,Leader 会从 Group Coordinator 处收到一份 Group 成员列表。然后,Leader 会使用 partition.assignment.strategy 配置中的自定义策略,在列表中的消费者之间重新分配分区。

2.2、提交的偏移量(Committed Offset)

Kafka 使用提交的偏移量(Committed Offset)来跟踪从 Topic 读取的最后位置。提交的偏移量是消费者确认成功处理的 Topic 位置。换句话说,它是自身和其他消费者在后续轮次中读取事件的起始点。

Kafka 将所有分区提交的偏移量(committed offsets)存储在名为 __consumer_offsets 的内部 Topic 中。可以放心地信任它的信息,因为对于副本(Replicated) Broker 来说,Topic 是持久(durable)和容错的(fault-tolerant)。

2.3、分区再平衡

分区再平衡(Partition Rebalance)将分区所有权从一个消费者转移到另一个消费者。当有新的消费者加入群组,或者群组中的消费者成员崩溃或取消订阅时,Kafka 会自动执行重新平衡。

为了提高可扩展性,当有新的消费者加入群组时,Kafka 会公平地与新加入的消费者共享其他消费者的分区。此外,当一个消费者崩溃时,其分区必须分配给组中的其余消费者,以避免丢失任何未处理的消息。

分区再平衡使用 __consumer_offsets Topic,使消费者从正确的位置开始读取重新分配的分区。

在重新平衡期间,消费者无法消费消息。换句话说,在重新平衡完成之前,Broker 不可用。此外,消费者会丢失他们的状态,需要重新计算它们的缓存值。分区再平衡期间的不可用性和缓存重新计算会降低事件消费的速度。

3、创建应用

创建一个基础的 Spring Kafka 应用。

3.1、创建基础配置

首先,配置主题及其分区:

@Configuration
public class KafkaTopicConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("topic-1")
            .partitions(2)
            .build();
    }
}

上述配置非常简单。只配置一个名为 topic-1 的新 Topic 和两个分区。

现在,配置 Producer:

@Configuration
public class KafkaProducerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaProducerTemplate() {
        return new KafkaTemplate<>(kafkaProducer());
    }
}

在上面的 Kafka 生产者配置中,设置了 Broker 地址和用于写入消息的 Serializer

最后,配置 Consumer:

@Configuration
public class KafkaConsumerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Double> kafkaConsumer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumer());
        return factory;
    }
}

3.2、设置 Consumer

在本例中,我们将从两个消费者开始,这两个消费者属于同一个组,名为 group-1,来自 topic-1

@Service
public class MessageConsumerService {
    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-0", consumerRecord);
    }

    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord);
    }
}

MessageConsumerService 类使用 @KafkaListener 注解注册了两个消费者,以监听 group-1 内的 topic-1 消息。

现在,在 MessageConsumerService 类中定义一个字段和一个方法来跟踪已消费的分区:

Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();

private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
    consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
    consumedPartitions.computeIfPresent(key, (k, v) -> {
        v.add(record.partition());
        return v;
    });
}

在上面的代码中,使用 ConcurrentHashMap 将每个消费者名称映射到该消费者消费的所有分区的 HashSet

4、当消费者离开时,可视化分区重新平衡

现在,已经设置了所有配置并注册了消费者,我们可以直观地看到当其中一个消费者离开 group-1 时 Kafka 会做什么。

为此,让我们定义一个使用嵌入式 Broker 的 Kafka 集成测试的框架:

@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {

    private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
    private static final int TOTAL_PRODUCED_MESSAGES = 50000;
    private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;

    @Autowired
    KafkaTemplate<String, Double> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    MessageConsumerService consumerService;
}

在上述代码中,注入了生成和消费消息所需的 Bean:kafkaTemplateconsumerService。还注入了 kafkaListenerEndpointRegistry Bean 来操作已注册的消费者。

最后,定义了将在测试用例中使用的三个常量。

现在,定义测试用例方法:

@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
    int currentMessage = 0;

    do {
        kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
        currentMessage++;

        if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
            String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
                .stream()
                .filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
                .findFirst()
                .orElse("");
            MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
            Thread.sleep(2000);
            Objects.requireNonNull(container).stop();
            kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
        }
    } while (currentMessage != TOTAL_PRODUCED_MESSAGES);
    Thread.sleep(2000);
    assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
    assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}

在上面的测试中,创建了一个消息流,在某一点上,移除了其中一个消费者,这样 Kafka 就会将其分区重新分配给剩余的消费者。

  1. 主循环使用 kafkaTemplate,通过 Apache Commons 的 RandomUtils 生成 50,000 个随机数事件。当产生任意数量的消息时(在本例中为 10,000 条),会停止并从 Broker 中取消注册一个消费者。
  2. 要取消注册消费者,首先使用 Stream 在容器中搜索匹配的消费者,并使用 getListenerContainer() 方法检索它。然后,调用 stop() 停止 container Spring 组件的执行。最后,调用 unregisterListenerContainer() 从 Kafka Broker 中以编程式取消注册与 container 变量相关联的监听器。

先来看看 Kafka 在测试执行过程中生成的几行日志。

最重要的一行是 consumer-1 向 Group Coordinator 发出的 LeaveGroup 请求:

INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092

然后,Group Coordinator 会自动触发再平衡(Rebalance),并显示背后的原因:

INFO  k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)

回到测试中,断言分区重新平衡是否正确发生。由于我们注销了以 1 结尾的消费者,它的分区应该重新分配给剩下的消费者,即 consumer-0。因此,使用跟踪的已消费记录的 Map 来检查 consumer-1 只从一个分区消费,而 consumer-0 从两个分区消费。

4、实用的消费者配置

现在,来看看影响分区再平衡(Partition Rebalance)的几种消费者配置,以及为它们设置特定值的权衡。

4.1、会话超时和心跳频率

session.timeout.ms 参数表示 Group Coordinator 在触发分区再平衡之前等待用户发送心跳的最长时间(以毫秒为单位)。除了 session.timeout.msheartbeat.interval.ms 还表示用户向 Group Coordinator 发送心跳的频率(以毫秒为单位)。

我们应该同时修改消费者超时和心跳频率,使心跳间隔毫秒数(heartbeat.interval.ms)始终低于会话超时毫秒数(session.timeout.ms)。这是因为我们不想让消费者在发送心跳之前就因为超时而死亡。通常,我们会将心跳间隔设置为会话超时的 33%,以确保在消费者死亡前发送一次以上的心跳。

消费者会话超时默认设置为 45 秒。我们可以修改该值,只要我们了解修改该值的利弊。

当我们将会话超时设置得比默认值低时,就能提高消费者组从故障中恢复的速度,从而提高组的可用性。然而,在 0.10.1.0 之前的 Kafka 版本中,如果消费者的主线程在消费超过会话超时的消息时被阻塞,消费者就无法发送心跳。因此,消费者被视为死亡,Group Coordinator 会触发不必要的分区再平衡。KIP-62 解决了这个问题,引入了一个只发送心跳的后台线程。

如果我们为会话超时值设置更高的值,就无法更快地检测到故障。不过,这可能会解决上文提到的 Kafka 版本高于 0.10.1.0 时不需要的分区再平衡问题。

4.2、最大轮询间隔时间

另一项配置是 max.poll.interval.ms,表示 Broker 等待空闲消费者的最长时间。过了这段时间,消费者就会停止发送心跳,直到达到配置的会话超时并离开群组。max.poll.interval.ms 的默认等待时间为五分钟。

如果为 max.poll.interval.ms 设置更高的值,就会给消费者更多的空闲时间,这可能有助于避免再平衡。不过,如果没有消息要消费,增加该时间也可能会增加空闲消费者的数量。这在低吞吐量环境中可能会造成问题,因为消费者闲置的时间会更长,从而增加基础设施成本。

5、总结

本文介绍了 Kafka 中 Group Leader 和 Group Coordinator 的基本原理,以及 Kafka 如何管理消费者组和分区。

最后,还通过一个实例展示了当一个消费者离开消费者组时,Kafka 如何自动重新平衡组内的分区。


Ref:https://www.baeldung.com/kafka-manage-consumer-groups