1、概览 本文将会带你学习在 Spring 应用中实现 Kafka Consumer 重试消费的 2 种方式,及其优缺点。
关于如何在 Spring 中整合 Kafka 的细节,请参阅 这里。
2、项目设置 创建一个新的 Spring Boot 项目,并添加 spring-kafka 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.0.1</version> </dependency> 创建一个对象:
public class Greeting { private String msg; private String name; // 构造函数、get、set 方法省略 } 3、Kafka Consumer Kafka Consumer(消费者)是从 Kafka 集群中读取数据的客户端应用程序。它订阅一个或多个 topic,并消费已发布的消息。Producer (生产者)向 topic 发送消息,topic 是存储和发布记录的类别名称。topic 被分为多个分区,以便横向扩展。每个分区都是一个不可更改的消息序列。
Consumer 可以通过指定偏移量(即消息在分区中的位置)来读取特定分区中的消息。Ack(确认)是消费者发送给 Kafka broker 的消息,表示它已成功处理了一条记录。一旦 ACK 被发送,消费者偏移量(consumer offset)将会被更新。
这将确保消息已被消费,并且不会再次传递给当前 Listener。
3.1、Ack 模式 Ack 模式决定了 broker 何时更新消费者偏移量(consumer offset)。
1、概览 Apache Kafka 是一个分布式且容错的流处理系统。
在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。
Spring Kafka 通过 KafkaTemplate 和使用 @KafkaListener 注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。
2、安装和设置 要下载和安装 Kafka,请参阅 此处 的官方指南。
我们需要在 pom.xml 中添加 spring-kafka 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.0.0</version> </dependency> 然后按如下方法配置 spring-boot-maven-plugin:
<plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass> </configuration> </plugin> 我们的示例应用程序是 Spring Boot。
本文假定服务器使用默认配置启动,并且没有更改服务端口。
3、配置 Topic 之前,我们使用命令行工具在 Kafka 中创建主题:
$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic 但随着 Kafka 引入 AdminClient,我们现在可以以编程式创建 topic。