在之前的教程中,我们对 Spring Cloud Stream Kafka 应用程序中事务的工作原理进行了基本分析。现在,我们终于来到了一个关键问题:“仅一次” (Exactly Once)语义,这是流式应用程序中一个被广泛讨论和需要的特性。在本文中,我们将了解如何通过 Apache Kafka 事务在 Spring Cloud Stream 应用中实现 “仅一次” 语义。通过之前章节对事务工作原理的了解,相对容易理解 Spring Cloud Stream Kafka 应用如何实现 “仅一次” 语义。
这里需要注意的一点是,在实现 “仅一次” 语义时,除了我们在之前的系列教程文章中已经见过的代码,我们无需编写任何新的代码。本文介绍了在 Spring Cloud Stream Kafka 应用程序中支持 “仅一次” 语义所需的特定要求。
在分布式计算中实现 “仅一次” 语义是一项困难的任务。这超出了本系列教程的范围,在这里无法回顾所有技术细节,以了解为什么这是一项如此困难的任务。对于有兴趣了解 “仅一次” 语义的所有基础知识以及为什么在分布式系统中实现它如此困难的读者,可以参考相关的广泛文献。Confluent 的这篇 博客 是理解这些技术挑战以及 Apache Kafka 实现的解决方案的良好起点。
虽然我们不会深入探讨细节,但看看 Apache Kafka 提供的不同交付保证还是值得的。主要有三种交付保证:
至少一次(at-least-once) 最多一次(at-most-once) 仅一次(exactly-once) 在 “至少一次” 的交付语义中,应用程序可以一次或多次接收数据,但保证至少接收一次。在 “最多一次” 语义的交付保证中,应用程序可能接收零次或一次数据,这意味着有可能丢失数据。“仅一次” 语义,正如其名称所示,保证只交付一次。根据应用程序的使用情况,使用其中任何一种保证都是可以的。默认情况下,Apache Kafka 提供至少一次交付保证,这意味着一条记录可能会被交付多次。如果你的应用程序可以处理重复记录或无记录的后果,那么使用非一次保证可能也没问题。相反,如果你处理的是关键任务数据,如金融系统或医疗数据,你就必须保证 “仅一次” 准确交付和处理,以避免严重后果。由于 Apache Kafka 等系统的分布式特性,通常很难实现精确的 “仅一次” 语义,这是因为系统中存在许多部件。
Spring Cloud Stream Kafka 和 Exactly-Once 语义 在本教程系列的前几篇文章中,我们看到了许多不同的应用场景。Apache Kafka 中的 “仅一次” 义针对的是读-处理-写(或消费-转换-生产)应用。有时,我们会对 “一次” 到底在做什么感到困惑。是最初的消费、数据处理,还是最后的生产?Apache Kafka 保证整个 “读取->处理-写入”(read->process-write)序列的 “仅一次”(actly-once)语义。在这个序列中,读取和处理部分总是至少一次 - 例如,如果部分处理或写入因任何原因失败。如果依赖 “仅一次” 交付,事务就非常关键,这样才能成功完成或回滚数据的最终发布。一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量不会更新,下一次轮询(如果是在 Spring Cloud Stream 中重试或应用程序重启)将重新交付相同的记录并再次进行处理。因此,在消费和处理(转换)部分中至少保证一次,这是需要理解的关键点。任何以 read_committed 隔离级别运行的下游消费者都只能从上游处理器获得一次信息。因此,我们必须明白,在 “仅一次” 交付的世界中,处理器和下游消费者必须协调才能从精确一次语义中获益。任何以 read_uncommitted 隔离级别运行的已生成主题的消费者都可能看到重复的数据。
在本系列教程的前几章中,我们分析了事务在 Spring Cloud Stream Kafka 应用中的工作原理。了解了事务发挥作用的不同环境,包括生产者和消费者应用,以及应用如何正确使用事务。现在,这些基本要素已经介绍完毕,让我们继续了解事务的另一个方面:在发生错误时回滚事务。当错误发生时,事务处理系统无法提交事务,事务管理器就会回滚事务,不会为下游消费者保留任何内容。如果应用能够决定回滚机制的工作方式,将大有裨益。Spring Cloud Stream 通过 Spring 对 Apache Kafla 的基本支持,为回滚定制提供了便利。在处理生产者和消费者(消费-处理-生产)事务性应用时,我们必须注意一些事情。接下来,让我们一起深入了解这些内容。
生产者发起的事务 下面是我们在 上一篇文章 中看到的代码片段。
@Transactional public void send(StreamBridge streamBridge) { for (int i = 0; i < 5; i++) { streamBridge.send("mySupplier-out-0", "my data: " + i); } } 如果事务方法抛出异常,我们应该怎么办?从 Spring Cloud Stream 的角度来看,我们不需要做任何操作。事务拦截器会启动回滚,最终由 Kafka 中的事务协调器中断事务。最后,异常会传播给调用者,然后调用者可以决定在错误是暂时的情况下重新触发事务方法。由于这是一个生产者发起的事务,框架不会进行重试。这种情况很简单,因为在事务回滚期间,从应用或框架的角度来看,我们不需要做任何操作。如果发生错误,它将被保证回滚。然而,需要注意的是,即使事务被回滚,Kafka 日志中可能存在未提交的记录。使用隔离级别为 read_uncommitted(默认级别)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保使用 read_committed 隔离级别,这样才不会收到上游事务回滚的任何记录。
生产者发起的事务与外部事务同步 在本系列教程的上一章,我们看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用也无需做任何事情来处理错误。数据库和 Kafka 发布的事务都会回滚。
消费者发起的事务回滚 如果生产者发起的事务回滚是如此简单,你可能会想知道为什么我们要专门为这个主题撰写一篇完整的文章。什么时候应用程序需要提供特定的回滚策略呢?当你有一个正在进行的消费者发起的事务时,这就有了意义,因为我们需要特别关注如何处理消费记录的状态和它们的偏移量。让我们重新看一下在系列中 之前的教程 中运行的消费者发起的事务方法代码。
public Consumer<PersonEvent> process(TxCode txCode) { return txCode::run; } @Component class TxCode { @Transactional void run(PersonEvent pe) { Person person = new Person(); person.
在本系列教程的 上一章,我们了解了事务管理的基本知识,主要是在使用生产者启动的 Spring Cloud Stream Kafka 应用时。上文还简要了解了 Spring Cloud Stream Kafka 消费者应用如何通过适当的隔离级别来消费以事务方式生成的记录。当你与外部事务管理器(如关系数据库的事务管理器)同步时,有提到你必须使用事务来确保数据完整性。在本教程中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。
注意,实现分布式事务在实践中是非常困难的。你必须依靠两阶段提交(2PC)策略和适当的分布式事务管理器(如与 JTA 兼容的事务管理器)才能正确实现这一目标。尽管如此,大多数企业用例可能并不需要这种复杂程度,而且我们考虑的大多数用例以及人们在实践中使用的大多数用例,最好还是坚持使用非分布式事务方法,正如我们在本教程中所描述的那样。这篇文章 是 Spring 团队的 Dave Syer 博士在 2009 年发表的,对于理解分布式事务的挑战和 Spring 中推荐的替代方法,这篇文章(即使已经过去了 14 年)仍然具有现实意义。
回到我们的主题:在生产者启动和消费-处理-生产(读-处理-写)应用中使用外部事务管理器时,在 Spring Cloud Stream Kafka 应用中实现事务性。
现在,通过示例进行说明。使用 domain 对象来驱动演示,并为它们创建了伪代码。
假设消息系统处理的是 “event” domain 类型 - 让我们使用 PersonEvent:
class PersonEvent { String name; String type; // 为简洁起见,其余部分省略 } 还需要为 Person 对象创建一个 Domain Entity:
@Entity @Table(name = "person") public class Person { @Id @GeneratedValue(strategy = GenerationType.
本文是系列教程的第 2 部分,在这一部分中,我们将通过 Spring Cloud Stream 和 Apache Kafka 详细介绍事务。在 上一节 中,我们了解了事务的基本概念。本文将深入了解一些实现细节。
在本文中,我们主要从生产者的角度来了解事务如何与 Spring Cloud Stream 和 Apache Kafka 配合使用。
Spring Cloud Stream 中的生产者 在深入了解生产者发起的事务之前,我们先来了解一下简单生产者的基本知识。在 Spring Cloud Stream 中,有几种编写生产者(在消息传递领域也称为发布者)的方法。如果你的用例需要定时生成数据,你可以编写一个 java.util.function.Supplier 方法,如下所示。
@Bean public Supplier<Pojo> mySupplier() { return () -> { new Pojo(); }; } 如代码所示,将上述 Supplier 作为 Spring Bean 提供时,Spring Cloud Stream 会将其视为发布者,由于我们在 Apache Kafka 的上下文中,因此它会将 POJO 记录发送到 Kafka Topic。
默认情况下,Spring Cloud Stream 每秒调用一次 Supplier,但你可以通过配置更改该计划。更多详情,请参阅 参考文档。
如果你不想轮询 Supplier,但又想控制其发布频率,该怎么办?Spring Cloud Stream 通过名为 StreamBridge 的开箱即用实现 StreamOperations API 提供了一种便捷的方法。下面是一个示例。
本系列教程重点介绍如何在 Spring Cloud Stream Kafka 应用中处理事务。涵盖了使用 Spring Cloud Stream 和 Apache Kafka 开发事务应用的许多底层细节。
基本组成 Spring Cloud Stream Kafka 应用中事务的基础支持主要来自于 Apache Kafka 本身和 Spring for Apache Kafka 库。然而,这个博客系列是关于如何在 Spring Cloud Stream 中使用这种支持。如果你熟悉 Apache Kafka 中的事务原理,以及 Spring for Apache Kafka 是如何以 Spring 的方式使用它的,那么这个系列将感觉像是自己的领域。
Apache Kafka 提供了基础事务支持,而 Spring for Apache Kafka(又称 Spring Kafka)库则在 Spring 侧扩展了这一支持,使 Spring 开发人员可以更自然地依赖 Spring Framework 中提供的传统事务支持来使用它。Spring Cloud Stream 中的 Kafka Binder(绑定器)进一步加强了 Spring 对 Apache Kafka 的支持,使在 Spring Cloud Stream Kafka 应用中使用相同的支持成为可能。在本系列教程的第一部分中,将简要介绍 Kafka 事务、一些有助于依赖事务的用例分析,以及 Apache Kafka 和 Spring 生态系统中的事务构建模块。