Kafka

在 Spring Boot 中动态管理 Kafka Listener

1、概览 本文将带你了解如何在 Spring Boot 应用中动态地启动和停止 Kafka Listener。 2、依赖 首先,添加 spring-kafka 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.1.2</version> </dependency> 3、配置 Kafka 消费者 生产者是向 Kafka Topic 发布(写入)事件的应用。 在本教程中,我们使用单元测试来模拟生产者向 Kafka Topic 发送事件。订阅 Topic 并处理事件流的消费者由应用中的 Listener(监听器)表示。该 Listener 被配置为处理来自 Kafka 的传入消息。 通过 KafkaConsumerConfig 类来配置 Kafka 消费者,其中包括 Kafka broker 的地址、消费者组 ID 以及 Key 和 Value 的反序列化器: @Bean public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.

在 Spring Boot 中处理 Kafka Offset(偏移量)

本文将带你了解如何使用 Spring Boot 和 Spring Kafka 管理 Kafka 消费者偏移量(Offset)。 在之的一篇文章中,主要说明了应用处理 Kafka 消息的方式可能会影响系统的整体性能,并没有考虑消费者端的消息重复或消息丢失等问题。本文将会介绍这些话题。 1、源码 本文中的源码托管在 GitHub,你可以克隆它到本地,然后按照说明中的步骤操作即可。 2、简介 在开始之前,首先要说明一些与使用 Spring Kafka 提交偏移量有关的重要事项。首先,默认情况下,Spring Kafka 会将消费者的 enable.auto.commit 属性设置为 false。这意味着提交偏移量的责任在于框架,而非 Kafka。当然,我们可以通过将该属性设置为 true 来改变默认行为。顺便说一句,这也是 Spring Kafka 2.3 之前的默认做法。 禁用了 Kafka 自动提交(Auto Commit)后,我们就可以利用 Spring Kafka 提供的 7 种不同的提交策略。本文不会分析所有策略,只分析最重要的几种。默认策略是 BATCH。为了设置不同的策略,需要覆盖 AckMode,例如在 Spring Boot application.properties 中设置 spring.kafka.listener.ack-mode 属性的值。 首先来看看 BATCH 模式。 3、Spring Boot Kafka 应用示例 为了测试使用 Spring Kafka 进行的偏移提交,我们将创建两个简单的应用:producer (生产者)和 consumer(消费者)。生产者向 Topic 发送规定数量的消息,而消费者接收并处理这些消息。下面是生产者 @RestController 的实现。它允许我们按需向 transactions Topic 发送指定数量的消息: @RestController public class TransactionsController { private static final Logger LOG = LoggerFactory .

Spring 配置 Kafka 死信队列

1、简介 本文将带你了解如何在 Spring 中为 Apache Kafka 配置死信队列。 2、死信队列 死信队列(Dead Letter Queue,DLQ)用于存储由于各种原因无法正确处理的消息,例如间歇性系统故障、无效的消息模式或损坏的内容。这些消息可以稍后从 DLQ 中移除,以进行分析或重新处理。 下图是 DLQ 机制的简化流程: 通常情况下,使用 DLQ 是一个不错的主意,但也有一些情况下应该避免使用 DLQ。例如,在对消息的精确顺序很重要的队列中,不建议使用 DLQ,因为重新处理 DLQ 消息会打乱消息的到达顺序。 3、Spring Kafka 中的死信队列 在 Spring Kafka 中,与 DLQ 概念相对应的是死信 Topic(DLT)。 接下来,我们通过一个简单的支付系统来介绍 DLT 应该如何使用。 3.1、Model 类 从 Model 类开始: public class Payment { private String reference; private BigDecimal amount; private Currency currency; // Get、Set 方法省略 } 再实现一个用于创建事件的方法: static Payment createPayment(String reference) { Payment payment = new Payment(); payment.setAmount(BigDecimal.valueOf(71)); payment.

将数据发送到 Kafka 中的特定分区

1、简介 Apache Kafka 是一个分布式流平台,擅长处理海量实时数据流。Kafka 将数据组织成 Topic(主题),并进一步将 Topic 划分为 Partition(分区)。每个分区都是一个独立的 Channel(通道),可实现并行处理和容错。 本文将带你了解如何把数据发送到 Kafka 中特定的分区。 2、理解 Kafka 分区 首先来了解一下 Kafka 分区的基本概念。 2.1、什么是 Kafka 分区? 当生产者向 Kafka Topic 发送消息时,Kafka 会使用指定的分区策略将这些消息组织到分区中。分区是一个基本单元,代表了线性、有序的消息序列。消息一旦产生,就会根据所选的分区策略被分配到一个特定的分区。随后,消息会被附加到该分区中日志的末尾。 2.2、并行消费与消费组 一个 Kafka Topic 可分为多个分区,一个消费组(Consumer Group)可被分配到这些分区的一个子集。组内的每个消费者都会独立处理来自其分配分区的消息。这种并行处理机制提高了整体吞吐量和可扩展性,使 Kafka 能够高效地处理大量数据。 2.3、顺序保证 在单个分区中,Kafka 可确保按照接收到的相同顺序处理消息。这保证了依赖消息顺序的应用(如金融交易或事件日志)的顺序处理。不过,需要注意的是,由于网络延迟和其他操作因素,接收消息的顺序可能与最初发送消息的顺序不同。 在不同的分区中,Kafka 并不保证顺序。来自不同分区的消息可能会被并发处理,从而带来事件顺序变化的可能性。在设计依赖于严格消息顺序的应用时,需要考虑到这个特性。 2.4、容错和高可用 分区还有助于 Kafka 实现出色的容错能力。每个分区都可以在多个 Broker 之间复制。如果 Broker 发生故障,副本分区仍可被访问,并确保对数据的持续访问。 Kafka 集群可以将消费者无缝重定向到健康的 Broker,从而保持数据的可用性和系统的高可靠性。 3、为什么要将数据发送到特定分区? 3.1、数据亲和性 数据亲和性是指有意将相关数据归入同一分区。通过将相关数据发送到特定分区,可以确保这些数据一起处理,从而提高处理效率。 例如,考虑一个场景,我们可能希望确保客户的订单位于同一个分区中,以便进行订单追踪和分析。保证特定客户的所有订单都进入同一个分区可以简化追踪和分析过程。 3.2、负载均衡 此外,在分区之间均匀地分配数据有助于确保最佳的资源利用率。在分区之间平均分配数据有助于优化 Kafka 集群内的资源利用率。通过根据负载情况向分区发送数据,可以防止出现资源瓶颈,确保每个分区都能接收到可管理的均衡工作量。 3.3、优先顺序 在某些情况下,并非所有数据都具有相同的优先级或紧迫性。Kafka 的分区功能可将关键数据引导到专用分区进行快速处理,从而实现关键数据的优先级排序。与不太重要的数据相比,这种优先级排序可确保高优先级的消息得到及时关注和更快处理。 4、向特定分区发送数据的方式 Kafka 提供了将消息分配到分区的各种策略,从而提供了数据分布和处理的灵活性。下面是一些可用于将消息发送到特定分区的常用方法。 4.1、粘性分区器(Sticky Partitioner) 在 Kafka 2.4 及以上版本中,粘性分区器(Sticky Partitioner)的目的是将没有 Key 的消息保持在同一个分区中。不过,这种行为并不是绝对的,它会与批处理设置(如 batch.

Spring Kafka 的 “Trusted Packages” 特性

1、概览 本文将带你了解 Spring Kafka 中的 “Trusted Packages” 功能,了解其背后的动机以及用法。 2、先决条件 一般来说,Spring Kafka 模块允许我们指定一些关于发送的 POJO 的元数据。它通常采用 Kafka Message Header 的形式。 例如,可以这样配置 ProducerFactory: @Bean public ProducerFactory<Object, SomeData> producerFactory() { JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>(); jsonSerializer.setAddTypeInfo(true); return new DefaultKafkaProducerFactory<>( producerFactoryConfig(), new StringOrBytesSerializer(), jsonSerializer ); } @Data @AllArgsConstructor static class SomeData { private String id; private String type; private String status; private Instant timestamp; } 然后,使用上面用 producerFactory 配置的 KafkaTemplate 在一个 Topic 中生产一条新消息: public void sendDataIntoKafka() { SomeData someData = new SomeData("1", "active", "sent", Instant.

在 Spring Boot 中测试 Kafka

1、概览 Apache Kafka 是一个功能强大、分布式、容错的流处理系统。在之前的教程中,介绍了 如何在 Spring 中整合、使用 Kafka。 本文将在 上一节 的基础上带你了解如何编写可靠、独立的集成测试,而不依赖于外部运行的 Kafka 服务器。 2、依赖 在 pom.xml 中添加标准的 spring-kafka 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.2</version> </dependency> 以及两个专门用于测试的依赖,spring-kafka-test 和 Testcontainers Kafka(注意,都是 Test Scope)。 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.6.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.19.3</version> <scope>test</scope> </dependency> 3、简单的 Kafka 生产者-消费者应用 这是一个标准的 Spring Boot 应用,Application 类如下: @SpringBootApplication public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } } 3.1、生产者设置 接下来,创建 Producer bean,用它来向指定的 Kafka Topic 发送消息:

管理 Kafka 消费者组

1、简介 消费者组允许多个消费者从同一 Topic 消费数据,有助于创建更具可扩展性的 Kafka 应用。 本文将带你了解消费者组以及它们如何在其消费者之间重新平衡分区(Rebalance Partition)。 2、消费者组是什么? 消费者组是一组与一个或多个 Topic 相关联的唯一消费者。每个消费者可以从 0 个、1 个或多个分区中消费数据。此外,每个分区在给定时间内只能分配给一个消费者。分区分配会随着群成员的变化而变化。这就是所谓的组重新平衡(Group Rebalancing)。 消费者分组是 Kafka 应用的重要组成部分。它允许将类似的消费者分组,使他们可以并行地从一个分区 Topic 中消费数据。因此,它提高了 Kafka 应用的性能和可扩展性。 2.1、Group Coordinator 和 Group Leader 当实例化消费者组时,Kafka 也会创建 Group Coordinator(组协调器)。Group Coordinator 会定期接收来自消费者的请求,这些请求被称为 “心跳”。如果某个消费者停止发送心跳,Coordinator 就会认为该消费者要么已经离开了组,要么已经崩溃。这就是分区重新平衡(Partition Rebalance)的一个可能触发因素。 第一个向 Group Coordinator 提出加入 Group 请求的消费者成为 Group Leader。当因任何原因发生重新平衡(Rebalance)时,Leader 会从 Group Coordinator 处收到一份 Group 成员列表。然后,Leader 会使用 partition.assignment.strategy 配置中的自定义策略,在列表中的消费者之间重新分配分区。 2.2、提交的偏移量(Committed Offset) Kafka 使用提交的偏移量(Committed Offset)来跟踪从 Topic 读取的最后位置。提交的偏移量是消费者确认成功处理的 Topic 位置。换句话说,它是自身和其他消费者在后续轮次中读取事件的起始点。 Kafka 将所有分区提交的偏移量(committed offsets)存储在名为 __consumer_offsets 的内部 Topic 中。可以放心地信任它的信息,因为对于副本(Replicated) Broker 来说,Topic 是持久(durable)和容错的(fault-tolerant)。

Spring Boot + Open Telemetry 实现 Kafka 追踪

本文将带你了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置追踪功能。我们会使用 Micrometer 库发送追踪信息,并使用Jaeger来存储和可视化这些数据。Spring Kafka内置了与 Micrometer 的集成,用于 KafkaTemplate 和监听容器。本文还会介绍何配置 Spring Kafka observability (可观察性),以在追踪中添加自定义标签(Tag)。 源码 你可以在 GitHub 上找到完整的源码。Clone 项目后,进入 kafka 目录,按照说明进行操作即可。 依赖 添加如下依赖,其中 Spring Boot Starter 和 Spring Kafka 用于发送或接收消息,Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥接器用于自动生成与每条消息相关的追踪,最后是 opentelemetry-exporter-otlp,用于将追踪导出到应用外。 对于本文中的两个示例 Spring Boot 应用,依赖都是相同的。 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-otel</artifactId> </dependency> <dependency> <groupId>io.

自定义 Apache Kafka Serializer(序列化器)

1、简介 在 Apache Kafka 中传输消息时,客户端和服务器会就使用共同的语法格式达成协议。Apache Kafka 提供了默认的转换器(Converter),如 String 和 Long。同时也支持针对特定用例的自定义序列化器 (Serializer)。 2、Apache Kafka 中的 Serializer 序列化是将对象转换为字节的过程。反序列化则是将字节流转换为对象的逆过程。简而言之,它将内容转换为可读和可解释的信息。 如上所述,Apache Kafka 为几种基本类型提供了默认序列化器,并允许我们实现自定义序列化器: 上图显示了通过网络向 Kafka Topic 发送消息的过程。在此过程中,生产者将消息发送到 Topic 之前,自定义序列化器会将对象转换成字节。同样,它也显示了反序列化器如何将字节转换回对象,以便消费者正确处理。 2.1、自定义 Serializer Apache Kafka 为几种基本类型提供了预置的序列化器和反序列化器: StringSerializer ShortSerializer IntegerSerializer LongSerializer DoubleSerializer BytesSerializer 它也提供了实现自定义序列化器/反序列化器的功能。为了序列化自己的对象,需要实现 Serializer 接口。同样,要创建自定义的反序列化器,需要实现 Deserializer 接口。 这两个接口都有可覆写的方法: configure:用于实现配置细节 serialize / deserialize:这些方法包括自定义序列化和反序列化的实际实现 close:使用该方法关闭 Kafka Session 3、实现自定义 Serializer Kafka 提供了自定义序列化器的功能。可以为消息的 Key 和 Value 实现特定的转换器(Converter)。 3.1、依赖 在 pom.xml 中添加 Kafka Consumer API 依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> 3.

Spring Boot 配置 Kafka SSL 双向认证

1、简介 本文将带你了解在 Spring Boot 中如何配置 SSL 认证以连接到 Apache Kafka Broker。 安全套接字层(SSL)实际上已被弃用,自 2015 年起被传输层安全(TLS)所取代。不过,由于历史原因,Kafka(和 Java)仍然使用 “SSL”。 2、SSL 概览 默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证。 首先,可以为 Broker 和客户端之间的加密配置 SSL。默认情况下,这需要使用公钥加密进行单向身份认证,由客户端验证服务器证书。 此外,服务器还可以使用单独的机制(如 SSL 或 SASL)对客户端进行身份认证,从而实现双向身份认证或相互 TLS(mTLS)。基本上,双向 SSL 认证确保客户端和服务器都使用 SSL 证书来认证对方的身份,并在双向上相互信任。 在本文中,Broker 使用 SSL 对客户端进行身份验证,使用 Keystore 和 Truststore 保存证书和密钥。 每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。 Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。 3、依赖和设置 创建一个简单的 Spring Boot 示例应用,在 pom.xml 中添加 spring-kafka 依赖: <dependency> <groupId>org.