Kafka

Kafka 中的 InstanceAlreadyExistsException 异常

1、简介 Apache Kafka 是一个功能强大的分布式流平台,被广泛用于构建实时数据管道和流应用。然而,Kafka 在运行过程中可能会遇到各种异常和错误。其中一个常见的异常就是 InstanceAlreadyExistsException。 本文将带你了解 Kafka 出现 InstanceAlreadyExistsException 异常的原因和解决办法。 2、InstanceAlreadyExistsException 异常是什么? InstanceAlreadyExistsException 是 java.lang.RuntimeException 的子类。在 Kafka 的上下文中,这个异常通常在尝试创建具有与现有生产者或消费者相同的 Client ID 的 Kafka 生产者或消费者时出现。 每个 Kafka 客户端实例都有一个唯一的 Client ID,它对 Kafka 集群内的元数据跟踪和客户端连接管理至关重要。如果试图创建一个新的客户端实例,而 Client ID 已被现有客户端使用,Kafka 会抛出 InstanceAlreadyExistsException(实例已存在异常)。 3、内部机制 虽然我们提到 Kafka 会抛出这个异常,但值得注意的是,Kafka 通常会在其内部机制中优雅地处理这个异常。通过在内部处理异常,Kafka 可以将问题隔离和限制在其自身的子系统中。这可以防止异常影响主线程,并潜在地导致更广泛的系统不稳定或停机。 在 Kafka 的内部实现中,registerAppInfo() 方法通常在初始化 Kafka 客户端(生产者或消费者)时被调用。假设现有的客户端有相同的 client.id,该方法会捕获 InstanceAlreadyExistsException。由于异常是在内部处理的,它不会被抛到主线程上,而人们可能希望在主线程上捕获异常。 4、InstanceAlreadyExistsException 的原因 在本节中,我们将通过代码示例来研究导致 InstanceAlreadyExistsException 的各种情况。 4.1、消费者组中重复的 Client ID Kafka 规定同一消费者组内的消费者有不同的 Client ID。当一个组内的多个消费者共享相同的 Client ID 时,Kafka 的消息传递语义可能会变得不可预测。这会干扰 Kafka 管理偏移量和维护消息顺序的能力,可能导致消息重复或丢失。因此,当多个消费者共享同一个 Client ID 时,就会触发该异常。

通过 CLI 命令行发送 Key/Value 消息到 Kafka

1、概览 本文将带你了解从 Kafka 命令行(CLI)发送 Key/Value 消息的两种方法。 在处理金融交易、预订、在线购物等实时事件驱动系统中,确保特定 Topic 上消息的有序性是一个常见需求。在这种情况下,我们应该为发送到这些 Topic 的事件使用 Kafka Message Key。 2、先决条件 首先,需要一个正在运行的 Kafka 实例。如果没有,可以使用 Kafka Docker 或根据 Kafka 快速入门指南 建立一个环境。下面的章节将假定我们已经有一个运行中的 Kafka 实例,且可以通过 kafka-server:9092 访问。 接下来,假设我们开发的是一个 “支付系统”,而且需要从命令行发送消息。 以下是对应的 Model 类: // 支付事件 public class PaymentEvent { private String reference; private BigDecimal amount; private Currency currency; // 标准的 Getter / Setter 方法 } 另一个前提条件是要有访问 Kafka CLI 工具的权限,这很简单。首先,下载 Kafka 发行版,然后解压下载的文件,并进入解压后的文件夹中。在 bin 文件夹下就可以找到 Kafka CLI 工具。 本文后面内容中的所有 CLI 命令,都假设是在 Kafka 解码文件夹下执行的。

在 Spring Boot 中动态管理 Kafka Listener

1、概览 本文将带你了解如何在 Spring Boot 应用中动态地启动和停止 Kafka Listener。 2、依赖 首先,添加 spring-kafka 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.1.2</version> </dependency> 3、配置 Kafka 消费者 生产者是向 Kafka Topic 发布(写入)事件的应用。 在本教程中,我们使用单元测试来模拟生产者向 Kafka Topic 发送事件。订阅 Topic 并处理事件流的消费者由应用中的 Listener(监听器)表示。该 Listener 被配置为处理来自 Kafka 的传入消息。 通过 KafkaConsumerConfig 类来配置 Kafka 消费者,其中包括 Kafka broker 的地址、消费者组 ID 以及 Key 和 Value 的反序列化器: @Bean public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.

在 Spring Boot 中处理 Kafka Offset(偏移量)

本文将带你了解如何使用 Spring Boot 和 Spring Kafka 管理 Kafka 消费者偏移量(Offset)。 在之的一篇文章中,主要说明了应用处理 Kafka 消息的方式可能会影响系统的整体性能,并没有考虑消费者端的消息重复或消息丢失等问题。本文将会介绍这些话题。 1、源码 本文中的源码托管在 GitHub,你可以克隆它到本地,然后按照说明中的步骤操作即可。 2、简介 在开始之前,首先要说明一些与使用 Spring Kafka 提交偏移量有关的重要事项。首先,默认情况下,Spring Kafka 会将消费者的 enable.auto.commit 属性设置为 false。这意味着提交偏移量的责任在于框架,而非 Kafka。当然,我们可以通过将该属性设置为 true 来改变默认行为。顺便说一句,这也是 Spring Kafka 2.3 之前的默认做法。 禁用了 Kafka 自动提交(Auto Commit)后,我们就可以利用 Spring Kafka 提供的 7 种不同的提交策略。本文不会分析所有策略,只分析最重要的几种。默认策略是 BATCH。为了设置不同的策略,需要覆盖 AckMode,例如在 Spring Boot application.properties 中设置 spring.kafka.listener.ack-mode 属性的值。 首先来看看 BATCH 模式。 3、Spring Boot Kafka 应用示例 为了测试使用 Spring Kafka 进行的偏移提交,我们将创建两个简单的应用:producer (生产者)和 consumer(消费者)。生产者向 Topic 发送规定数量的消息,而消费者接收并处理这些消息。下面是生产者 @RestController 的实现。它允许我们按需向 transactions Topic 发送指定数量的消息: @RestController public class TransactionsController { private static final Logger LOG = LoggerFactory .

Spring 配置 Kafka 死信队列

1、简介 本文将带你了解如何在 Spring 中为 Apache Kafka 配置死信队列。 2、死信队列 死信队列(Dead Letter Queue,DLQ)用于存储由于各种原因无法正确处理的消息,例如间歇性系统故障、无效的消息模式或损坏的内容。这些消息可以稍后从 DLQ 中移除,以进行分析或重新处理。 下图是 DLQ 机制的简化流程: 通常情况下,使用 DLQ 是一个不错的主意,但也有一些情况下应该避免使用 DLQ。例如,在对消息的精确顺序很重要的队列中,不建议使用 DLQ,因为重新处理 DLQ 消息会打乱消息的到达顺序。 3、Spring Kafka 中的死信队列 在 Spring Kafka 中,与 DLQ 概念相对应的是死信 Topic(DLT)。 接下来,我们通过一个简单的支付系统来介绍 DLT 应该如何使用。 3.1、Model 类 从 Model 类开始: public class Payment { private String reference; private BigDecimal amount; private Currency currency; // Get、Set 方法省略 } 再实现一个用于创建事件的方法: static Payment createPayment(String reference) { Payment payment = new Payment(); payment.setAmount(BigDecimal.valueOf(71)); payment.

将数据发送到 Kafka 中的特定分区

1、简介 Apache Kafka 是一个分布式流平台,擅长处理海量实时数据流。Kafka 将数据组织成 Topic(主题),并进一步将 Topic 划分为 Partition(分区)。每个分区都是一个独立的 Channel(通道),可实现并行处理和容错。 本文将带你了解如何把数据发送到 Kafka 中特定的分区。 2、理解 Kafka 分区 首先来了解一下 Kafka 分区的基本概念。 2.1、什么是 Kafka 分区? 当生产者向 Kafka Topic 发送消息时,Kafka 会使用指定的分区策略将这些消息组织到分区中。分区是一个基本单元,代表了线性、有序的消息序列。消息一旦产生,就会根据所选的分区策略被分配到一个特定的分区。随后,消息会被附加到该分区中日志的末尾。 2.2、并行消费与消费组 一个 Kafka Topic 可分为多个分区,一个消费组(Consumer Group)可被分配到这些分区的一个子集。组内的每个消费者都会独立处理来自其分配分区的消息。这种并行处理机制提高了整体吞吐量和可扩展性,使 Kafka 能够高效地处理大量数据。 2.3、顺序保证 在单个分区中,Kafka 可确保按照接收到的相同顺序处理消息。这保证了依赖消息顺序的应用(如金融交易或事件日志)的顺序处理。不过,需要注意的是,由于网络延迟和其他操作因素,接收消息的顺序可能与最初发送消息的顺序不同。 在不同的分区中,Kafka 并不保证顺序。来自不同分区的消息可能会被并发处理,从而带来事件顺序变化的可能性。在设计依赖于严格消息顺序的应用时,需要考虑到这个特性。 2.4、容错和高可用 分区还有助于 Kafka 实现出色的容错能力。每个分区都可以在多个 Broker 之间复制。如果 Broker 发生故障,副本分区仍可被访问,并确保对数据的持续访问。 Kafka 集群可以将消费者无缝重定向到健康的 Broker,从而保持数据的可用性和系统的高可靠性。 3、为什么要将数据发送到特定分区? 3.1、数据亲和性 数据亲和性是指有意将相关数据归入同一分区。通过将相关数据发送到特定分区,可以确保这些数据一起处理,从而提高处理效率。 例如,考虑一个场景,我们可能希望确保客户的订单位于同一个分区中,以便进行订单追踪和分析。保证特定客户的所有订单都进入同一个分区可以简化追踪和分析过程。 3.2、负载均衡 此外,在分区之间均匀地分配数据有助于确保最佳的资源利用率。在分区之间平均分配数据有助于优化 Kafka 集群内的资源利用率。通过根据负载情况向分区发送数据,可以防止出现资源瓶颈,确保每个分区都能接收到可管理的均衡工作量。 3.3、优先顺序 在某些情况下,并非所有数据都具有相同的优先级或紧迫性。Kafka 的分区功能可将关键数据引导到专用分区进行快速处理,从而实现关键数据的优先级排序。与不太重要的数据相比,这种优先级排序可确保高优先级的消息得到及时关注和更快处理。 4、向特定分区发送数据的方式 Kafka 提供了将消息分配到分区的各种策略,从而提供了数据分布和处理的灵活性。下面是一些可用于将消息发送到特定分区的常用方法。 4.1、粘性分区器(Sticky Partitioner) 在 Kafka 2.4 及以上版本中,粘性分区器(Sticky Partitioner)的目的是将没有 Key 的消息保持在同一个分区中。不过,这种行为并不是绝对的,它会与批处理设置(如 batch.

Spring Kafka 的 “Trusted Packages” 特性

1、概览 本文将带你了解 Spring Kafka 中的 “Trusted Packages” 功能,了解其背后的动机以及用法。 2、先决条件 一般来说,Spring Kafka 模块允许我们指定一些关于发送的 POJO 的元数据。它通常采用 Kafka Message Header 的形式。 例如,可以这样配置 ProducerFactory: @Bean public ProducerFactory<Object, SomeData> producerFactory() { JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>(); jsonSerializer.setAddTypeInfo(true); return new DefaultKafkaProducerFactory<>( producerFactoryConfig(), new StringOrBytesSerializer(), jsonSerializer ); } @Data @AllArgsConstructor static class SomeData { private String id; private String type; private String status; private Instant timestamp; } 然后,使用上面用 producerFactory 配置的 KafkaTemplate 在一个 Topic 中生产一条新消息: public void sendDataIntoKafka() { SomeData someData = new SomeData("1", "active", "sent", Instant.

在 Spring Boot 中测试 Kafka

1、概览 Apache Kafka 是一个功能强大、分布式、容错的流处理系统。在之前的教程中,介绍了 如何在 Spring 中整合、使用 Kafka。 本文将在 上一节 的基础上带你了解如何编写可靠、独立的集成测试,而不依赖于外部运行的 Kafka 服务器。 2、依赖 在 pom.xml 中添加标准的 spring-kafka 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.2</version> </dependency> 以及两个专门用于测试的依赖,spring-kafka-test 和 Testcontainers Kafka(注意,都是 Test Scope)。 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.6.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.19.3</version> <scope>test</scope> </dependency> 3、简单的 Kafka 生产者-消费者应用 这是一个标准的 Spring Boot 应用,Application 类如下: @SpringBootApplication public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } } 3.1、生产者设置 接下来,创建 Producer bean,用它来向指定的 Kafka Topic 发送消息:

管理 Kafka 消费者组

1、简介 消费者组允许多个消费者从同一 Topic 消费数据,有助于创建更具可扩展性的 Kafka 应用。 本文将带你了解消费者组以及它们如何在其消费者之间重新平衡分区(Rebalance Partition)。 2、消费者组是什么? 消费者组是一组与一个或多个 Topic 相关联的唯一消费者。每个消费者可以从 0 个、1 个或多个分区中消费数据。此外,每个分区在给定时间内只能分配给一个消费者。分区分配会随着群成员的变化而变化。这就是所谓的组重新平衡(Group Rebalancing)。 消费者分组是 Kafka 应用的重要组成部分。它允许将类似的消费者分组,使他们可以并行地从一个分区 Topic 中消费数据。因此,它提高了 Kafka 应用的性能和可扩展性。 2.1、Group Coordinator 和 Group Leader 当实例化消费者组时,Kafka 也会创建 Group Coordinator(组协调器)。Group Coordinator 会定期接收来自消费者的请求,这些请求被称为 “心跳”。如果某个消费者停止发送心跳,Coordinator 就会认为该消费者要么已经离开了组,要么已经崩溃。这就是分区重新平衡(Partition Rebalance)的一个可能触发因素。 第一个向 Group Coordinator 提出加入 Group 请求的消费者成为 Group Leader。当因任何原因发生重新平衡(Rebalance)时,Leader 会从 Group Coordinator 处收到一份 Group 成员列表。然后,Leader 会使用 partition.assignment.strategy 配置中的自定义策略,在列表中的消费者之间重新分配分区。 2.2、提交的偏移量(Committed Offset) Kafka 使用提交的偏移量(Committed Offset)来跟踪从 Topic 读取的最后位置。提交的偏移量是消费者确认成功处理的 Topic 位置。换句话说,它是自身和其他消费者在后续轮次中读取事件的起始点。 Kafka 将所有分区提交的偏移量(committed offsets)存储在名为 __consumer_offsets 的内部 Topic 中。可以放心地信任它的信息,因为对于副本(Replicated) Broker 来说,Topic 是持久(durable)和容错的(fault-tolerant)。

Spring Boot + Open Telemetry 实现 Kafka 追踪

本文将带你了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置追踪功能。我们会使用 Micrometer 库发送追踪信息,并使用Jaeger来存储和可视化这些数据。Spring Kafka内置了与 Micrometer 的集成,用于 KafkaTemplate 和监听容器。本文还会介绍何配置 Spring Kafka observability (可观察性),以在追踪中添加自定义标签(Tag)。 源码 你可以在 GitHub 上找到完整的源码。Clone 项目后,进入 kafka 目录,按照说明进行操作即可。 依赖 添加如下依赖,其中 Spring Boot Starter 和 Spring Kafka 用于发送或接收消息,Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥接器用于自动生成与每条消息相关的追踪,最后是 opentelemetry-exporter-otlp,用于将追踪导出到应用外。 对于本文中的两个示例 Spring Boot 应用,依赖都是相同的。 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-otel</artifactId> </dependency> <dependency> <groupId>io.