在 Spring Boot 中测试 Kafka
1、概览
Apache Kafka 是一个功能强大、分布式、容错的流处理系统。在之前的教程中,介绍了 如何在 Spring 中整合、使用 Kafka。
本文将在 上一节 的基础上带你了解如何编写可靠、独立的集成测试,而不依赖于外部运行的 Kafka 服务器。
2、依赖
在 pom.xml
中添加标准的 spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
以及两个专门用于测试的依赖,spring-kafka-test
和 Testcontainers Kafka(注意,都是 Test Scope)。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
3、简单的 Kafka 生产者-消费者应用
这是一个标准的 Spring Boot 应用,Application 类如下:
@SpringBootApplication
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
3.1、生产者设置
接下来,创建 Producer bean,用它来向指定的 Kafka Topic 发送消息:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
上面定义的 KafkaProducer
Bean 只是 KafkaTemplate
类的一个封装。该类提供高级的线程安全操作,例如向指定的 Topic 发送数据,如 send
方法所示。
3.2、消费者设置
同样,定义一个简单的 Consumer bean,它监听 Kafka Topic 并接收消息:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
// 其他 getter 方法
}
消费者在 receive
方法上使用 @KafkaListener
注解来监听指定 Topic 上的消息。稍后会介绍如何配置测试中的 test.topic
。
此外,receive
方法会将消息内容存储到 Bean 中,并递减 latch
变量的计数。该变量是一个简单的线程安全计数器字段,在稍后测试中将使用它来确保成功地接收了一条消息。
4、关于测试
一般来说,在编写简洁的集成测试时,不应依赖我们可能无法控制或可能突然停止工作的外部服务。这可能会对测试结果产生不利影响。
同样,如果依赖于一个外部服务,在本例中就是一个正在运行的 Kafka Broker,那么我们很可能无法在测试中以我们想要的方式进行设置、控制和清理。
4.1、Application Properties
在 src/test/resources/application.yml
文件中定义如下属性:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
这是在使用嵌入式 Kafka 实例或本地 Broker 时需要的最少配置。
其中大部分不言自明,需要注意的是消费者属性 auto-offset-reset: earliest
。该属性可确保消费组收到发送的信息,因为容器可能会在发送完成后才启动。
此外,还配置了一个 topic
属性,其值为 embedded-test-topic
,这就是我们要使用的测试 Topic。
5、使用嵌入式 Kafka 进行测试
使用内存中的 Kafka 实例来运行测试,即嵌入式 Kafka。
之前添加的依赖 spring-kafka-test
包含一些有用的工具类,可以帮助我们测试应用。其中最重要的是 EmbeddedKafkaBroker
类。
编写第一个集成测试:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
首先,使用两个非常标准的 Spring 注解来装饰测试类:
@SpringBootTest
注解确保测试能启动 Spring Application Context。- 还使用了
@DirtiesContext
注解,以确保在不同测试之间对 Context 进行清理和重置。
关键部分来了 - 使用 @EmbeddedKafka
注解将 EmbeddedKafkaBroker
的实例注入测试中。
此外,还可以使用几个可用属性来配置嵌入式 Kafka 节点:
partitions
- 这是每个 Topic 使用的分区数量。为了简单,只在测试中使用一个分区。brokerProperties
- Kafka Broker 的额外属性。同样,为了简单,指定一个纯文本(PLAINTEXT)监听器和一个端口号。
接下来,自动装配 Consumer
和 Producer
类,并使用 application.properties
中的值配置一个 Topic。
最后,只需向测试 Topic 发送一条信息,然后验证信息是否已被接收,并包含测试 Topic 的名称。
运行测试,你会在 Spring 的冗长输出中看到以下内容:
...
12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO c.b.kafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
key = null, value = Sending with our own simple KafkaProducer key)'
这说明测试通过,一切正常。
6、使用 TestContainers 测试 Kafka
有时,我们可能会看到真正的外部服务与专门为测试目的提供的嵌入式服务内存实例之间存在细微差别。虽然可能性不大,但也有可能测试中使用的端口被占用,导致测试失败。
鉴于此,本节使用 Testcontainers 框架进行测试的方法的一种变体。我们将从集成测试中了解如何实例化和管理托管在 Docker 容器内的外部 Apache Kafka Broker。
考虑到这一点,在本节中,我们将使用一种对之前的测试方法的变体,使用 Testcontainers 框架。来看看到如何在集成测试中实例化和管理一个托管在 Docker 容器中的外部 Apache Kafka Broker。
定义另一个集成测试,与上一节中的测试非常相似:
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
来看看两者的区别。声明了一个名为 kafka
的字段,这是一个标准的 JUnit @ClassRule
。该字段是 KafkaContainer
类的实例,它将准备和管理运行 Kafka 的容器的生命周期。
为了避免端口冲突,Testcontainers 会在 Docker 容器启动时动态分配端口号。
因此,使用 KafkaTestContainersConfiguration
类提供了一个自定义的 Consumer 和 Producer 工厂配置:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// 其他标准配置
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// 其他标准配置
return new DefaultKafkaProducerFactory<>(configProps);
}
然后,在测试开始时通过 @Import
注解引用该配置。
这样做的原因是,需要一种将服务器地址注入应用的方法,如前所述,服务器地址是动态生成的。
为此,调用 getBootstrapServers()
方法,该方法将返回 Bootstrap Server 的位置:
bootstrap.servers = [PLAINTEXT://localhost:32789]
现在,运行测试,你会看到 Testcontainers 做了几件事:
- 检查本地 Docker 设置
- 必要时拉取
confluentinc/cp-kafka:5.4.3
docker 映像 - 启动一个新容器,等待它准备就绪
- 最后,在测试结束后关闭并删除容器
同样,在测试输出的日志中也可以确认这一点:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
7、总结
本文介绍了在 Spring Boot 中测试 Kafak 的几种方式,包括使用嵌入式 Kafka Broker 以及使用 Testcontainers 设置在 Docker 容器内运行的外部 Kafka Broker。
Ref:https://www.baeldung.com/spring-boot-kafka-testing