kafka

Spring 配置 Kafka 死信队列

1、简介 本文将带你了解如何在 Spring 中为 Apache Kafka 配置死信队列。 2、死信队列 死信队列(Dead Letter Queue,DLQ)用于存储由于各种原因无法正确处理的消息,例如间歇性系统故障、无效的消息模式或损坏的内容。这些消息可以稍后从 DLQ 中移除,以进行分析或重新处理。 下图是 DLQ 机制的简化流程: 通常情况下,使用 DLQ 是一个不错的主意,但也有一些情况下应该避免使用 DLQ。例如,在对消息的精确顺序很重要的队列中,不建议使用 DLQ,因为重新处理 DLQ 消息会打乱消息的到达顺序。 3、Spring Kafka 中的死信队列 在 Spring Kafka 中,与 DLQ 概念相对应的是死信 Topic(DLT)。 接下来,我们通过一个简单的支付系统来介绍 DLT 应该如何使用。 3.1、Model 类 从 Model 类开始: public class Payment { private String reference; private BigDecimal amount; private Currency currency; // Get、Set 方法省略 } 再实现一个用于创建事件的方法: static Payment createPayment(String reference) { Payment payment = new Payment(); payment.setAmount(BigDecimal.valueOf(71)); payment.

将数据发送到 Kafka 中的特定分区

1、简介 Apache Kafka 是一个分布式流平台,擅长处理海量实时数据流。Kafka 将数据组织成 Topic(主题),并进一步将 Topic 划分为 Partition(分区)。每个分区都是一个独立的 Channel(通道),可实现并行处理和容错。 本文将带你了解如何把数据发送到 Kafka 中特定的分区。 2、理解 Kafka 分区 首先来了解一下 Kafka 分区的基本概念。 2.1、什么是 Kafka 分区? 当生产者向 Kafka Topic 发送消息时,Kafka 会使用指定的分区策略将这些消息组织到分区中。分区是一个基本单元,代表了线性、有序的消息序列。消息一旦产生,就会根据所选的分区策略被分配到一个特定的分区。随后,消息会被附加到该分区中日志的末尾。 2.2、并行消费与消费组 一个 Kafka Topic 可分为多个分区,一个消费组(Consumer Group)可被分配到这些分区的一个子集。组内的每个消费者都会独立处理来自其分配分区的消息。这种并行处理机制提高了整体吞吐量和可扩展性,使 Kafka 能够高效地处理大量数据。 2.3、顺序保证 在单个分区中,Kafka 可确保按照接收到的相同顺序处理消息。这保证了依赖消息顺序的应用(如金融交易或事件日志)的顺序处理。不过,需要注意的是,由于网络延迟和其他操作因素,接收消息的顺序可能与最初发送消息的顺序不同。 在不同的分区中,Kafka 并不保证顺序。来自不同分区的消息可能会被并发处理,从而带来事件顺序变化的可能性。在设计依赖于严格消息顺序的应用时,需要考虑到这个特性。 2.4、容错和高可用 分区还有助于 Kafka 实现出色的容错能力。每个分区都可以在多个 Broker 之间复制。如果 Broker 发生故障,副本分区仍可被访问,并确保对数据的持续访问。 Kafka 集群可以将消费者无缝重定向到健康的 Broker,从而保持数据的可用性和系统的高可靠性。 3、为什么要将数据发送到特定分区? 3.1、数据亲和性 数据亲和性是指有意将相关数据归入同一分区。通过将相关数据发送到特定分区,可以确保这些数据一起处理,从而提高处理效率。 例如,考虑一个场景,我们可能希望确保客户的订单位于同一个分区中,以便进行订单追踪和分析。保证特定客户的所有订单都进入同一个分区可以简化追踪和分析过程。 3.2、负载均衡 此外,在分区之间均匀地分配数据有助于确保最佳的资源利用率。在分区之间平均分配数据有助于优化 Kafka 集群内的资源利用率。通过根据负载情况向分区发送数据,可以防止出现资源瓶颈,确保每个分区都能接收到可管理的均衡工作量。 3.3、优先顺序 在某些情况下,并非所有数据都具有相同的优先级或紧迫性。Kafka 的分区功能可将关键数据引导到专用分区进行快速处理,从而实现关键数据的优先级排序。与不太重要的数据相比,这种优先级排序可确保高优先级的消息得到及时关注和更快处理。 4、向特定分区发送数据的方式 Kafka 提供了将消息分配到分区的各种策略,从而提供了数据分布和处理的灵活性。下面是一些可用于将消息发送到特定分区的常用方法。 4.1、粘性分区器(Sticky Partitioner) 在 Kafka 2.4 及以上版本中,粘性分区器(Sticky Partitioner)的目的是将没有 Key 的消息保持在同一个分区中。不过,这种行为并不是绝对的,它会与批处理设置(如 batch.

Spring Kafka 的 “Trusted Packages” 特性

1、概览 本文将带你了解 Spring Kafka 中的 “Trusted Packages” 功能,了解其背后的动机以及用法。 2、先决条件 一般来说,Spring Kafka 模块允许我们指定一些关于发送的 POJO 的元数据。它通常采用 Kafka Message Header 的形式。 例如,可以这样配置 ProducerFactory: @Bean public ProducerFactory<Object, SomeData> producerFactory() { JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>(); jsonSerializer.setAddTypeInfo(true); return new DefaultKafkaProducerFactory<>( producerFactoryConfig(), new StringOrBytesSerializer(), jsonSerializer ); } @Data @AllArgsConstructor static class SomeData { private String id; private String type; private String status; private Instant timestamp; } 然后,使用上面用 producerFactory 配置的 KafkaTemplate 在一个 Topic 中生产一条新消息: public void sendDataIntoKafka() { SomeData someData = new SomeData("1", "active", "sent", Instant.

在 Spring Boot 中测试 Kafka

1、概览 Apache Kafka 是一个功能强大、分布式、容错的流处理系统。在之前的教程中,介绍了 如何在 Spring 中整合、使用 Kafka。 本文将在 上一节 的基础上带你了解如何编写可靠、独立的集成测试,而不依赖于外部运行的 Kafka 服务器。 2、依赖 在 pom.xml 中添加标准的 spring-kafka 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.2</version> </dependency> 以及两个专门用于测试的依赖,spring-kafka-test 和 Testcontainers Kafka(注意,都是 Test Scope)。 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.6.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.19.3</version> <scope>test</scope> </dependency> 3、简单的 Kafka 生产者-消费者应用 这是一个标准的 Spring Boot 应用,Application 类如下: @SpringBootApplication public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } } 3.1、生产者设置 接下来,创建 Producer bean,用它来向指定的 Kafka Topic 发送消息:

管理 Kafka 消费者组

1、简介 消费者组允许多个消费者从同一 Topic 消费数据,有助于创建更具可扩展性的 Kafka 应用。 本文将带你了解消费者组以及它们如何在其消费者之间重新平衡分区(Rebalance Partition)。 2、消费者组是什么? 消费者组是一组与一个或多个 Topic 相关联的唯一消费者。每个消费者可以从 0 个、1 个或多个分区中消费数据。此外,每个分区在给定时间内只能分配给一个消费者。分区分配会随着群成员的变化而变化。这就是所谓的组重新平衡(Group Rebalancing)。 消费者分组是 Kafka 应用的重要组成部分。它允许将类似的消费者分组,使他们可以并行地从一个分区 Topic 中消费数据。因此,它提高了 Kafka 应用的性能和可扩展性。 2.1、Group Coordinator 和 Group Leader 当实例化消费者组时,Kafka 也会创建 Group Coordinator(组协调器)。Group Coordinator 会定期接收来自消费者的请求,这些请求被称为 “心跳”。如果某个消费者停止发送心跳,Coordinator 就会认为该消费者要么已经离开了组,要么已经崩溃。这就是分区重新平衡(Partition Rebalance)的一个可能触发因素。 第一个向 Group Coordinator 提出加入 Group 请求的消费者成为 Group Leader。当因任何原因发生重新平衡(Rebalance)时,Leader 会从 Group Coordinator 处收到一份 Group 成员列表。然后,Leader 会使用 partition.assignment.strategy 配置中的自定义策略,在列表中的消费者之间重新分配分区。 2.2、提交的偏移量(Committed Offset) Kafka 使用提交的偏移量(Committed Offset)来跟踪从 Topic 读取的最后位置。提交的偏移量是消费者确认成功处理的 Topic 位置。换句话说,它是自身和其他消费者在后续轮次中读取事件的起始点。 Kafka 将所有分区提交的偏移量(committed offsets)存储在名为 __consumer_offsets 的内部 Topic 中。可以放心地信任它的信息,因为对于副本(Replicated) Broker 来说,Topic 是持久(durable)和容错的(fault-tolerant)。

Spring Boot + Open Telemetry 实现 Kafka 追踪

本文将带你了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置追踪功能。我们会使用 Micrometer 库发送追踪信息,并使用Jaeger来存储和可视化这些数据。Spring Kafka内置了与 Micrometer 的集成,用于 KafkaTemplate 和监听容器。本文还会介绍何配置 Spring Kafka observability (可观察性),以在追踪中添加自定义标签(Tag)。 源码 你可以在 GitHub 上找到完整的源码。Clone 项目后,进入 kafka 目录,按照说明进行操作即可。 依赖 添加如下依赖,其中 Spring Boot Starter 和 Spring Kafka 用于发送或接收消息,Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥接器用于自动生成与每条消息相关的追踪,最后是 opentelemetry-exporter-otlp,用于将追踪导出到应用外。 对于本文中的两个示例 Spring Boot 应用,依赖都是相同的。 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-otel</artifactId> </dependency> <dependency> <groupId>io.

自定义 Apache Kafka Serializer(序列化器)

1、简介 在 Apache Kafka 中传输消息时,客户端和服务器会就使用共同的语法格式达成协议。Apache Kafka 提供了默认的转换器(Converter),如 String 和 Long。同时也支持针对特定用例的自定义序列化器 (Serializer)。 2、Apache Kafka 中的 Serializer 序列化是将对象转换为字节的过程。反序列化则是将字节流转换为对象的逆过程。简而言之,它将内容转换为可读和可解释的信息。 如上所述,Apache Kafka 为几种基本类型提供了默认序列化器,并允许我们实现自定义序列化器: 上图显示了通过网络向 Kafka Topic 发送消息的过程。在此过程中,生产者将消息发送到 Topic 之前,自定义序列化器会将对象转换成字节。同样,它也显示了反序列化器如何将字节转换回对象,以便消费者正确处理。 2.1、自定义 Serializer Apache Kafka 为几种基本类型提供了预置的序列化器和反序列化器: StringSerializer ShortSerializer IntegerSerializer LongSerializer DoubleSerializer BytesSerializer 它也提供了实现自定义序列化器/反序列化器的功能。为了序列化自己的对象,需要实现 Serializer 接口。同样,要创建自定义的反序列化器,需要实现 Deserializer 接口。 这两个接口都有可覆写的方法: configure:用于实现配置细节 serialize / deserialize:这些方法包括自定义序列化和反序列化的实际实现 close:使用该方法关闭 Kafka Session 3、实现自定义 Serializer Kafka 提供了自定义序列化器的功能。可以为消息的 Key 和 Value 实现特定的转换器(Converter)。 3.1、依赖 在 pom.xml 中添加 Kafka Consumer API 依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> 3.

Spring Boot 配置 Kafka SSL 双向认证

1、简介 本文将带你了解在 Spring Boot 中如何配置 SSL 认证以连接到 Apache Kafka Broker。 安全套接字层(SSL)实际上已被弃用,自 2015 年起被传输层安全(TLS)所取代。不过,由于历史原因,Kafka(和 Java)仍然使用 “SSL”。 2、SSL 概览 默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证。 首先,可以为 Broker 和客户端之间的加密配置 SSL。默认情况下,这需要使用公钥加密进行单向身份认证,由客户端验证服务器证书。 此外,服务器还可以使用单独的机制(如 SSL 或 SASL)对客户端进行身份认证,从而实现双向身份认证或相互 TLS(mTLS)。基本上,双向 SSL 认证确保客户端和服务器都使用 SSL 证书来认证对方的身份,并在双向上相互信任。 在本文中,Broker 使用 SSL 对客户端进行身份验证,使用 Keystore 和 Truststore 保存证书和密钥。 每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。 Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。 3、依赖和设置 创建一个简单的 Spring Boot 示例应用,在 pom.xml 中添加 spring-kafka 依赖: <dependency> <groupId>org.

Spring Cloud Stream Kafka 实现 Apache Kafka 的 “仅一次” 语义

在之前的教程中,我们对 Spring Cloud Stream Kafka 应用程序中事务的工作原理进行了基本分析。现在,我们终于来到了一个关键问题:“仅一次” (Exactly Once)语义,这是流式应用程序中一个被广泛讨论和需要的特性。在本文中,我们将了解如何通过 Apache Kafka 事务在 Spring Cloud Stream 应用中实现 “仅一次” 语义。通过之前章节对事务工作原理的了解,相对容易理解 Spring Cloud Stream Kafka 应用如何实现 “仅一次” 语义。 这里需要注意的一点是,在实现 “仅一次” 语义时,除了我们在之前的系列教程文章中已经见过的代码,我们无需编写任何新的代码。本文介绍了在 Spring Cloud Stream Kafka 应用程序中支持 “仅一次” 语义所需的特定要求。 在分布式计算中实现 “仅一次” 语义是一项困难的任务。这超出了本系列教程的范围,在这里无法回顾所有技术细节,以了解为什么这是一项如此困难的任务。对于有兴趣了解 “仅一次” 语义的所有基础知识以及为什么在分布式系统中实现它如此困难的读者,可以参考相关的广泛文献。Confluent 的这篇 博客 是理解这些技术挑战以及 Apache Kafka 实现的解决方案的良好起点。 虽然我们不会深入探讨细节,但看看 Apache Kafka 提供的不同交付保证还是值得的。主要有三种交付保证: 至少一次(at-least-once) 最多一次(at-most-once) 仅一次(exactly-once) 在 “至少一次” 的交付语义中,应用程序可以一次或多次接收数据,但保证至少接收一次。在 “最多一次” 语义的交付保证中,应用程序可能接收零次或一次数据,这意味着有可能丢失数据。“仅一次” 语义,正如其名称所示,保证只交付一次。根据应用程序的使用情况,使用其中任何一种保证都是可以的。默认情况下,Apache Kafka 提供至少一次交付保证,这意味着一条记录可能会被交付多次。如果你的应用程序可以处理重复记录或无记录的后果,那么使用非一次保证可能也没问题。相反,如果你处理的是关键任务数据,如金融系统或医疗数据,你就必须保证 “仅一次” 准确交付和处理,以避免严重后果。由于 Apache Kafka 等系统的分布式特性,通常很难实现精确的 “仅一次” 语义,这是因为系统中存在许多部件。 Spring Cloud Stream Kafka 和 Exactly-Once 语义 在本教程系列的前几篇文章中,我们看到了许多不同的应用场景。Apache Kafka 中的 “仅一次” 义针对的是读-处理-写(或消费-转换-生产)应用。有时,我们会对 “一次” 到底在做什么感到困惑。是最初的消费、数据处理,还是最后的生产?Apache Kafka 保证整个 “读取->处理-写入”(read->process-write)序列的 “仅一次”(actly-once)语义。在这个序列中,读取和处理部分总是至少一次 - 例如,如果部分处理或写入因任何原因失败。如果依赖 “仅一次” 交付,事务就非常关键,这样才能成功完成或回滚数据的最终发布。一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量不会更新,下一次轮询(如果是在 Spring Cloud Stream 中重试或应用程序重启)将重新交付相同的记录并再次进行处理。因此,在消费和处理(转换)部分中至少保证一次,这是需要理解的关键点。任何以 read_committed 隔离级别运行的下游消费者都只能从上游处理器获得一次信息。因此,我们必须明白,在 “仅一次” 交付的世界中,处理器和下游消费者必须协调才能从精确一次语义中获益。任何以 read_uncommitted 隔离级别运行的已生成主题的消费者都可能看到重复的数据。

Spring Cloud Stream 和 Apache Kafka 的事务回滚策略

在本系列教程的前几章中,我们分析了事务在 Spring Cloud Stream Kafka 应用中的工作原理。了解了事务发挥作用的不同环境,包括生产者和消费者应用,以及应用如何正确使用事务。现在,这些基本要素已经介绍完毕,让我们继续了解事务的另一个方面:在发生错误时回滚事务。当错误发生时,事务处理系统无法提交事务,事务管理器就会回滚事务,不会为下游消费者保留任何内容。如果应用能够决定回滚机制的工作方式,将大有裨益。Spring Cloud Stream 通过 Spring 对 Apache Kafla 的基本支持,为回滚定制提供了便利。在处理生产者和消费者(消费-处理-生产)事务性应用时,我们必须注意一些事情。接下来,让我们一起深入了解这些内容。 生产者发起的事务 下面是我们在 上一篇文章 中看到的代码片段。 @Transactional public void send(StreamBridge streamBridge) { for (int i = 0; i < 5; i++) { streamBridge.send("mySupplier-out-0", "my data: " + i); } } 如果事务方法抛出异常,我们应该怎么办?从 Spring Cloud Stream 的角度来看,我们不需要做任何操作。事务拦截器会启动回滚,最终由 Kafka 中的事务协调器中断事务。最后,异常会传播给调用者,然后调用者可以决定在错误是暂时的情况下重新触发事务方法。由于这是一个生产者发起的事务,框架不会进行重试。这种情况很简单,因为在事务回滚期间,从应用或框架的角度来看,我们不需要做任何操作。如果发生错误,它将被保证回滚。然而,需要注意的是,即使事务被回滚,Kafka 日志中可能存在未提交的记录。使用隔离级别为 read_uncommitted(默认级别)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保使用 read_committed 隔离级别,这样才不会收到上游事务回滚的任何记录。 生产者发起的事务与外部事务同步 在本系列教程的上一章,我们看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用也无需做任何事情来处理错误。数据库和 Kafka 发布的事务都会回滚。 消费者发起的事务回滚 如果生产者发起的事务回滚是如此简单,你可能会想知道为什么我们要专门为这个主题撰写一篇完整的文章。什么时候应用程序需要提供特定的回滚策略呢?当你有一个正在进行的消费者发起的事务时,这就有了意义,因为我们需要特别关注如何处理消费记录的状态和它们的偏移量。让我们重新看一下在系列中 之前的教程 中运行的消费者发起的事务方法代码。 public Consumer<PersonEvent> process(TxCode txCode) { return txCode::run; } @Component class TxCode { @Transactional void run(PersonEvent pe) { Person person = new Person(); person.