Spring Boot 整合 Kafka Stream

1、简介

流式数据在现实生活中的一些例子包括传感器数据、股票市场事件流和系统日志。在本文中,我们通过构建一个简单的字数统计流式应用来介绍如何在 Spring Boot 中使用 Kafka Streams。

2、概览

Kafka Streams 在 Kafka Topic 和关系型数据库表之间提供了一种对偶性。它使我们能够对一个或多个流式事件进行连接、分组、聚合和过滤等操作。

Kafka 流的一个重要概念是处理器拓扑(Processor Topology)。处理器拓扑是 Kafka Stream 对一个或多个事件流进行操作的蓝图。从本质上讲,处理器拓扑可视为有向无环图。在这个图中,节点分为源节点、处理器节点和汇节点,而边则代表流事件的流向。

位于拓扑结构顶端的源接收来自 Kafka 的流数据,将其向下传递到执行自定义操作的处理器节点,并通过汇节点流出到新的 Kafka Topicc。在进行核心处理的同时,还利用检查点(Checkpoint)定期保存数据流的状态,以实现容错和弹性。

3、依赖

首先在 POM 中添加 spring-kafkakafka-streams 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId
    <artifactId>kafka-streams</artifactId>
    <version>2.7.1</version>
</dependency> 

4、示例

示例应用从输入的 Kafka Topic 中读取流式事件。读取记录后,它会对记录进行处理,分割文本并计算单个字数。随后,它将更新的字数发送到 Kafka 输出。除了输出 Topic 外,还要创建一个简单的 REST 服务,通过 HTTP 端点公开该计数。

总之,输出 Topic 将不断更新从输入事件中提取的单词及其更新计数。

4.1、配置

在 Java 配置类中定义 Kafka Stream 配置:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }

    // 其他配置
}

如上,使用了 @EnableKafkaStreams 注解来自动配置所需的组件。这个自动配置需要一个名为 DEFAULT_STREAMS_CONFIG_BEAN_NAME 指定的 KafkaStreamsConfiguration Bean。Spring Boot 使用这个配置并创建一个 KafkaStreams 客户端来管理应用生命周期。

在这个示例中,为配置提供了 application id、bootstrap server 连接详情和 Serializer / Deserializer。

4.2、拓扑

配置设置完成后,为应用构建拓扑结构,以记录输入信息的字数:

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

如上,定义了一个配置方法,并用 @Autowired 对其进行了注解。Spring 会处理此注解,并将容器中匹配的 StreamsBuilder Bean 注入到参数。或者,也可以在配置类中创建一个 Bean 来生成拓扑结构。

通过 StreamsBuilder 可以访问所有 Kafka Streams API,并使其成为一个常规的 Kafka Streams 应用。在本例中,使用这种高级 DSL 来定义应用的转换操作:

  • 使用指定的 key 和 value Serializer / Deserializer 从输入 Topic 创建 KStream
  • 通过转换、拆分、分组,然后计算数据,创建 KTable
  • 将结果具体化为输出流。

从本质上讲,Spring Boot 在管理 KStream 实例生命周期的同时,为 Streams API 提供了一个非常薄的封装。它为拓扑创建和配置所需的组件,并执行 Streams 应用。重要的是,这可以让我们专注于核心业务逻辑,而 Spring 则负责管理生命周期。

4.3、REST 服务

通过声明步骤定义管道(Pipeline)后,创建 REST Controller,提供端点以便将消息 POST 到输入 Topic,并 GET 指定单词的计数。重要的是,应用从 Kafka Streams 状态存储而不是输出 Topic 中检索数据。

首先,修改之前的 KTable,并将聚合计数具体化为本地状态存储。这样就可以通过 REST Controller 进行查询:

KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

之后,更新 Controller,从 counts 状态存储中检索计数值:

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

如上,factoryBeanStreamsBuilderFactoryBean 的一个实例,它被注入到 Controller 中。这提供了由该工厂 Bean 管理的 KafkaStreams 实例。因此,我们可以获得之前创建的由 KTable 表示的 key/value counts 状态存储。此时,可以使用它从本地状态存储中获取请求的单词的当前计数。

5、测试

测试是开发和验证应用拓扑的关键部分。Spring Kafka 测试库和 Testcontainers 都在不同层面为测试应用提供了出色的支持。

5.1、单元测试

首先,使用 TopologyTestDriver 为拓扑结构设置一个单元测试。这是测试 Kafka Streams 应用的主要测试工具:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

首先将业务逻辑从 WordCountProcessor 中封装到测试中的 Topology 中。现在,可以使用 TopologyTestDriver 为测试创建输入和输出 Topic。最重要的是,这样就不需要运行代理,同时还能验证管道(Pipeline)行为。换句话说,它让我们无需使用真正的 Kafka Broker 就能快速、轻松地验证管道行为。

5.2、集成测试

最后,使用 Testcontainers 框架来端到端测试应用。这将使用运行中的 Kafka Broker,并启动应用进行完整的测试:

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    // 其他的测试设置

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // 断言输出 Topic 的计数正确
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // 断言来自 REST 服务的计数正确
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

如上,向 REST Controller 发送了一个 POST,而 REST Controller 又将消息发送到 Kafka 输入 Topic。在设置中,还启动了一个 Kafka 消费者。它异步监听 Kafka 输出 Topic,并用接收到的字数更新 BlockingQueue

在测试执行期间,应用处理输入信息。随后,可以使用 REST 服务验证 Topic 和状态存储的输出是否符合预期。

6、总结

本文介绍了如何使用 Kafka Streams 和 Spring Boot 创建一个简单的事件驱动应用来处理消息。


参考:https://www.baeldung.com/spring-boot-kafka-streams