1、简介 在 Apache Kafka 中传输消息时,客户端和服务器会就使用共同的语法格式达成协议。Apache Kafka 提供了默认的转换器(Converter),如 String 和 Long。同时也支持针对特定用例的自定义序列化器 (Serializer)。
2、Apache Kafka 中的 Serializer 序列化是将对象转换为字节的过程。反序列化则是将字节流转换为对象的逆过程。简而言之,它将内容转换为可读和可解释的信息。
如上所述,Apache Kafka 为几种基本类型提供了默认序列化器,并允许我们实现自定义序列化器:
上图显示了通过网络向 Kafka Topic 发送消息的过程。在此过程中,生产者将消息发送到 Topic 之前,自定义序列化器会将对象转换成字节。同样,它也显示了反序列化器如何将字节转换回对象,以便消费者正确处理。
2.1、自定义 Serializer Apache Kafka 为几种基本类型提供了预置的序列化器和反序列化器:
StringSerializer ShortSerializer IntegerSerializer LongSerializer DoubleSerializer BytesSerializer 它也提供了实现自定义序列化器/反序列化器的功能。为了序列化自己的对象,需要实现 Serializer 接口。同样,要创建自定义的反序列化器,需要实现 Deserializer 接口。
这两个接口都有可覆写的方法:
configure:用于实现配置细节 serialize / deserialize:这些方法包括自定义序列化和反序列化的实际实现 close:使用该方法关闭 Kafka Session 3、实现自定义 Serializer Kafka 提供了自定义序列化器的功能。可以为消息的 Key 和 Value 实现特定的转换器(Converter)。
3.1、依赖 在 pom.xml 中添加 Kafka Consumer API 依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> 3.
1、简介 本文将带你了解在 Spring Boot 中如何配置 SSL 认证以连接到 Apache Kafka Broker。
安全套接字层(SSL)实际上已被弃用,自 2015 年起被传输层安全(TLS)所取代。不过,由于历史原因,Kafka(和 Java)仍然使用 “SSL”。
2、SSL 概览 默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证。
首先,可以为 Broker 和客户端之间的加密配置 SSL。默认情况下,这需要使用公钥加密进行单向身份认证,由客户端验证服务器证书。
此外,服务器还可以使用单独的机制(如 SSL 或 SASL)对客户端进行身份认证,从而实现双向身份认证或相互 TLS(mTLS)。基本上,双向 SSL 认证确保客户端和服务器都使用 SSL 证书来认证对方的身份,并在双向上相互信任。
在本文中,Broker 使用 SSL 对客户端进行身份验证,使用 Keystore 和 Truststore 保存证书和密钥。
每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。
Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。
3、依赖和设置 创建一个简单的 Spring Boot 示例应用,在 pom.xml 中添加 spring-kafka 依赖:
<dependency> <groupId>org.
在之前的教程中,我们对 Spring Cloud Stream Kafka 应用程序中事务的工作原理进行了基本分析。现在,我们终于来到了一个关键问题:“仅一次” (Exactly Once)语义,这是流式应用程序中一个被广泛讨论和需要的特性。在本文中,我们将了解如何通过 Apache Kafka 事务在 Spring Cloud Stream 应用中实现 “仅一次” 语义。通过之前章节对事务工作原理的了解,相对容易理解 Spring Cloud Stream Kafka 应用如何实现 “仅一次” 语义。
这里需要注意的一点是,在实现 “仅一次” 语义时,除了我们在之前的系列教程文章中已经见过的代码,我们无需编写任何新的代码。本文介绍了在 Spring Cloud Stream Kafka 应用程序中支持 “仅一次” 语义所需的特定要求。
在分布式计算中实现 “仅一次” 语义是一项困难的任务。这超出了本系列教程的范围,在这里无法回顾所有技术细节,以了解为什么这是一项如此困难的任务。对于有兴趣了解 “仅一次” 语义的所有基础知识以及为什么在分布式系统中实现它如此困难的读者,可以参考相关的广泛文献。Confluent 的这篇 博客 是理解这些技术挑战以及 Apache Kafka 实现的解决方案的良好起点。
虽然我们不会深入探讨细节,但看看 Apache Kafka 提供的不同交付保证还是值得的。主要有三种交付保证:
至少一次(at-least-once) 最多一次(at-most-once) 仅一次(exactly-once) 在 “至少一次” 的交付语义中,应用程序可以一次或多次接收数据,但保证至少接收一次。在 “最多一次” 语义的交付保证中,应用程序可能接收零次或一次数据,这意味着有可能丢失数据。“仅一次” 语义,正如其名称所示,保证只交付一次。根据应用程序的使用情况,使用其中任何一种保证都是可以的。默认情况下,Apache Kafka 提供至少一次交付保证,这意味着一条记录可能会被交付多次。如果你的应用程序可以处理重复记录或无记录的后果,那么使用非一次保证可能也没问题。相反,如果你处理的是关键任务数据,如金融系统或医疗数据,你就必须保证 “仅一次” 准确交付和处理,以避免严重后果。由于 Apache Kafka 等系统的分布式特性,通常很难实现精确的 “仅一次” 语义,这是因为系统中存在许多部件。
Spring Cloud Stream Kafka 和 Exactly-Once 语义 在本教程系列的前几篇文章中,我们看到了许多不同的应用场景。Apache Kafka 中的 “仅一次” 义针对的是读-处理-写(或消费-转换-生产)应用。有时,我们会对 “一次” 到底在做什么感到困惑。是最初的消费、数据处理,还是最后的生产?Apache Kafka 保证整个 “读取->处理-写入”(read->process-write)序列的 “仅一次”(actly-once)语义。在这个序列中,读取和处理部分总是至少一次 - 例如,如果部分处理或写入因任何原因失败。如果依赖 “仅一次” 交付,事务就非常关键,这样才能成功完成或回滚数据的最终发布。一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量不会更新,下一次轮询(如果是在 Spring Cloud Stream 中重试或应用程序重启)将重新交付相同的记录并再次进行处理。因此,在消费和处理(转换)部分中至少保证一次,这是需要理解的关键点。任何以 read_committed 隔离级别运行的下游消费者都只能从上游处理器获得一次信息。因此,我们必须明白,在 “仅一次” 交付的世界中,处理器和下游消费者必须协调才能从精确一次语义中获益。任何以 read_uncommitted 隔离级别运行的已生成主题的消费者都可能看到重复的数据。
在本系列教程的前几章中,我们分析了事务在 Spring Cloud Stream Kafka 应用中的工作原理。了解了事务发挥作用的不同环境,包括生产者和消费者应用,以及应用如何正确使用事务。现在,这些基本要素已经介绍完毕,让我们继续了解事务的另一个方面:在发生错误时回滚事务。当错误发生时,事务处理系统无法提交事务,事务管理器就会回滚事务,不会为下游消费者保留任何内容。如果应用能够决定回滚机制的工作方式,将大有裨益。Spring Cloud Stream 通过 Spring 对 Apache Kafla 的基本支持,为回滚定制提供了便利。在处理生产者和消费者(消费-处理-生产)事务性应用时,我们必须注意一些事情。接下来,让我们一起深入了解这些内容。
生产者发起的事务 下面是我们在 上一篇文章 中看到的代码片段。
@Transactional public void send(StreamBridge streamBridge) { for (int i = 0; i < 5; i++) { streamBridge.send("mySupplier-out-0", "my data: " + i); } } 如果事务方法抛出异常,我们应该怎么办?从 Spring Cloud Stream 的角度来看,我们不需要做任何操作。事务拦截器会启动回滚,最终由 Kafka 中的事务协调器中断事务。最后,异常会传播给调用者,然后调用者可以决定在错误是暂时的情况下重新触发事务方法。由于这是一个生产者发起的事务,框架不会进行重试。这种情况很简单,因为在事务回滚期间,从应用或框架的角度来看,我们不需要做任何操作。如果发生错误,它将被保证回滚。然而,需要注意的是,即使事务被回滚,Kafka 日志中可能存在未提交的记录。使用隔离级别为 read_uncommitted(默认级别)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保使用 read_committed 隔离级别,这样才不会收到上游事务回滚的任何记录。
生产者发起的事务与外部事务同步 在本系列教程的上一章,我们看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用也无需做任何事情来处理错误。数据库和 Kafka 发布的事务都会回滚。
消费者发起的事务回滚 如果生产者发起的事务回滚是如此简单,你可能会想知道为什么我们要专门为这个主题撰写一篇完整的文章。什么时候应用程序需要提供特定的回滚策略呢?当你有一个正在进行的消费者发起的事务时,这就有了意义,因为我们需要特别关注如何处理消费记录的状态和它们的偏移量。让我们重新看一下在系列中 之前的教程 中运行的消费者发起的事务方法代码。
public Consumer<PersonEvent> process(TxCode txCode) { return txCode::run; } @Component class TxCode { @Transactional void run(PersonEvent pe) { Person person = new Person(); person.
在本系列教程的 上一章,我们了解了事务管理的基本知识,主要是在使用生产者启动的 Spring Cloud Stream Kafka 应用时。上文还简要了解了 Spring Cloud Stream Kafka 消费者应用如何通过适当的隔离级别来消费以事务方式生成的记录。当你与外部事务管理器(如关系数据库的事务管理器)同步时,有提到你必须使用事务来确保数据完整性。在本教程中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。
注意,实现分布式事务在实践中是非常困难的。你必须依靠两阶段提交(2PC)策略和适当的分布式事务管理器(如与 JTA 兼容的事务管理器)才能正确实现这一目标。尽管如此,大多数企业用例可能并不需要这种复杂程度,而且我们考虑的大多数用例以及人们在实践中使用的大多数用例,最好还是坚持使用非分布式事务方法,正如我们在本教程中所描述的那样。这篇文章 是 Spring 团队的 Dave Syer 博士在 2009 年发表的,对于理解分布式事务的挑战和 Spring 中推荐的替代方法,这篇文章(即使已经过去了 14 年)仍然具有现实意义。
回到我们的主题:在生产者启动和消费-处理-生产(读-处理-写)应用中使用外部事务管理器时,在 Spring Cloud Stream Kafka 应用中实现事务性。
现在,通过示例进行说明。使用 domain 对象来驱动演示,并为它们创建了伪代码。
假设消息系统处理的是 “event” domain 类型 - 让我们使用 PersonEvent:
class PersonEvent { String name; String type; // 为简洁起见,其余部分省略 } 还需要为 Person 对象创建一个 Domain Entity:
@Entity @Table(name = "person") public class Person { @Id @GeneratedValue(strategy = GenerationType.
本文是系列教程的第 2 部分,在这一部分中,我们将通过 Spring Cloud Stream 和 Apache Kafka 详细介绍事务。在 上一节 中,我们了解了事务的基本概念。本文将深入了解一些实现细节。
在本文中,我们主要从生产者的角度来了解事务如何与 Spring Cloud Stream 和 Apache Kafka 配合使用。
Spring Cloud Stream 中的生产者 在深入了解生产者发起的事务之前,我们先来了解一下简单生产者的基本知识。在 Spring Cloud Stream 中,有几种编写生产者(在消息传递领域也称为发布者)的方法。如果你的用例需要定时生成数据,你可以编写一个 java.util.function.Supplier 方法,如下所示。
@Bean public Supplier<Pojo> mySupplier() { return () -> { new Pojo(); }; } 如代码所示,将上述 Supplier 作为 Spring Bean 提供时,Spring Cloud Stream 会将其视为发布者,由于我们在 Apache Kafka 的上下文中,因此它会将 POJO 记录发送到 Kafka Topic。
默认情况下,Spring Cloud Stream 每秒调用一次 Supplier,但你可以通过配置更改该计划。更多详情,请参阅 参考文档。
如果你不想轮询 Supplier,但又想控制其发布频率,该怎么办?Spring Cloud Stream 通过名为 StreamBridge 的开箱即用实现 StreamOperations API 提供了一种便捷的方法。下面是一个示例。
本系列教程重点介绍如何在 Spring Cloud Stream Kafka 应用中处理事务。涵盖了使用 Spring Cloud Stream 和 Apache Kafka 开发事务应用的许多底层细节。
基本组成 Spring Cloud Stream Kafka 应用中事务的基础支持主要来自于 Apache Kafka 本身和 Spring for Apache Kafka 库。然而,这个博客系列是关于如何在 Spring Cloud Stream 中使用这种支持。如果你熟悉 Apache Kafka 中的事务原理,以及 Spring for Apache Kafka 是如何以 Spring 的方式使用它的,那么这个系列将感觉像是自己的领域。
Apache Kafka 提供了基础事务支持,而 Spring for Apache Kafka(又称 Spring Kafka)库则在 Spring 侧扩展了这一支持,使 Spring 开发人员可以更自然地依赖 Spring Framework 中提供的传统事务支持来使用它。Spring Cloud Stream 中的 Kafka Binder(绑定器)进一步加强了 Spring 对 Apache Kafka 的支持,使在 Spring Cloud Stream Kafka 应用中使用相同的支持成为可能。在本系列教程的第一部分中,将简要介绍 Kafka 事务、一些有助于依赖事务的用例分析,以及 Apache Kafka 和 Spring 生态系统中的事务构建模块。
1、概览 本文将会带你学习在 Spring 应用中实现 Kafka Consumer 重试消费的 2 种方式,及其优缺点。
关于如何在 Spring 中整合 Kafka 的细节,请参阅 这里。
2、项目设置 创建一个新的 Spring Boot 项目,并添加 spring-kafka 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.0.1</version> </dependency> 创建一个对象:
public class Greeting { private String msg; private String name; // 构造函数、get、set 方法省略 } 3、Kafka Consumer Kafka Consumer(消费者)是从 Kafka 集群中读取数据的客户端应用程序。它订阅一个或多个 topic,并消费已发布的消息。Producer (生产者)向 topic 发送消息,topic 是存储和发布记录的类别名称。topic 被分为多个分区,以便横向扩展。每个分区都是一个不可更改的消息序列。
Consumer 可以通过指定偏移量(即消息在分区中的位置)来读取特定分区中的消息。Ack(确认)是消费者发送给 Kafka broker 的消息,表示它已成功处理了一条记录。一旦 ACK 被发送,消费者偏移量(consumer offset)将会被更新。
这将确保消息已被消费,并且不会再次传递给当前 Listener。
3.1、Ack 模式 Ack 模式决定了 broker 何时更新消费者偏移量(consumer offset)。
1、概览 Apache Kafka 是一个分布式且容错的流处理系统。
在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。
Spring Kafka 通过 KafkaTemplate 和使用 @KafkaListener 注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。
2、安装和设置 要下载和安装 Kafka,请参阅 此处 的官方指南。
我们需要在 pom.xml 中添加 spring-kafka 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.0.0</version> </dependency> 然后按如下方法配置 spring-boot-maven-plugin:
<plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass> </configuration> </plugin> 我们的示例应用程序是 Spring Boot。
本文假定服务器使用默认配置启动,并且没有更改服务端口。
3、配置 Topic 之前,我们使用命令行工具在 Kafka 中创建主题:
$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic 但随着 Kafka 引入 AdminClient,我们现在可以以编程式创建 topic。