Spring Cloud Stream Kafka 实现 Apache Kafka 的 “仅一次” 语义

在之前的教程中,我们对 Spring Cloud Stream Kafka 应用程序中事务的工作原理进行了基本分析。现在,我们终于来到了一个关键问题:“仅一次” (Exactly Once)语义,这是流式应用程序中一个被广泛讨论和需要的特性。在本文中,我们将了解如何通过 Apache Kafka 事务在 Spring Cloud Stream 应用中实现 “仅一次” 语义。通过之前章节对事务工作原理的了解,相对容易理解 Spring Cloud Stream Kafka 应用如何实现 “仅一次” 语义。

这里需要注意的一点是,在实现 “仅一次” 语义时,除了我们在之前的系列教程文章中已经见过的代码,我们无需编写任何新的代码。本文介绍了在 Spring Cloud Stream Kafka 应用程序中支持 “仅一次” 语义所需的特定要求。

在分布式计算中实现 “仅一次” 语义是一项困难的任务。这超出了本系列教程的范围,在这里无法回顾所有技术细节,以了解为什么这是一项如此困难的任务。对于有兴趣了解 “仅一次” 语义的所有基础知识以及为什么在分布式系统中实现它如此困难的读者,可以参考相关的广泛文献。Confluent 的这篇 博客 是理解这些技术挑战以及 Apache Kafka 实现的解决方案的良好起点。

虽然我们不会深入探讨细节,但看看 Apache Kafka 提供的不同交付保证还是值得的。主要有三种交付保证:

  • 至少一次(at-least-once)
  • 最多一次(at-most-once)
  • 仅一次(exactly-once)

在 “至少一次” 的交付语义中,应用程序可以一次或多次接收数据,但保证至少接收一次。在 “最多一次” 语义的交付保证中,应用程序可能接收零次或一次数据,这意味着有可能丢失数据。“仅一次” 语义,正如其名称所示,保证只交付一次。根据应用程序的使用情况,使用其中任何一种保证都是可以的。默认情况下,Apache Kafka 提供至少一次交付保证,这意味着一条记录可能会被交付多次。如果你的应用程序可以处理重复记录或无记录的后果,那么使用非一次保证可能也没问题。相反,如果你处理的是关键任务数据,如金融系统或医疗数据,你就必须保证 “仅一次” 准确交付和处理,以避免严重后果。由于 Apache Kafka 等系统的分布式特性,通常很难实现精确的 “仅一次” 语义,这是因为系统中存在许多部件。

Spring Cloud Stream Kafka 和 Exactly-Once 语义

在本教程系列的前几篇文章中,我们看到了许多不同的应用场景。Apache Kafka 中的 “仅一次” 义针对的是读-处理-写(或消费-转换-生产)应用。有时,我们会对 “一次” 到底在做什么感到困惑。是最初的消费、数据处理,还是最后的生产?Apache Kafka 保证整个 “读取->处理-写入”(read->process-write)序列的 “仅一次”(actly-once)语义。在这个序列中,读取和处理部分总是至少一次 - 例如,如果部分处理或写入因任何原因失败。如果依赖 “仅一次” 交付,事务就非常关键,这样才能成功完成或回滚数据的最终发布。一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量不会更新,下一次轮询(如果是在 Spring Cloud Stream 中重试或应用程序重启)将重新交付相同的记录并再次进行处理。因此,在消费和处理(转换)部分中至少保证一次,这是需要理解的关键点。任何以 read_committed 隔离级别运行的下游消费者都只能从上游处理器获得一次信息。因此,我们必须明白,在 “仅一次” 交付的世界中,处理器和下游消费者必须协调才能从精确一次语义中获益。任何以 read_uncommitted 隔离级别运行的已生成主题的消费者都可能看到重复的数据。

另一个需要记住的要点是,由于记录的消费和处理可能会发生多次,应用程序代码需要遵循幂等模式,这主要是在代码与外部系统(如数据库)交互时需要考虑的问题。在这种情况下,由应用程序来确保用户代码没有其他问题的产生。

让我们重温一下之前看到的简单 “消费-处理-生产” 循环的代码。

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

正如我们之前看到的,要使该应用程序具有事务性,我们必须为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 配置属性提供一个合适的值。只需在 Spring Cloud Stream 中提供该属性,上述代码段就能完全实现 “仅一次” 交付。整个端到端流程在事务边界内运行(尽管在上述示例中我们有两个事务)。我们有一个在容器中调用监听器时启动的外部 Kafka 事务,以及由事务拦截器启动的另一个 JPA 事务。当 StreamBridge send 发生时,会使用与初始 Kafka 事务相同的事务资源,但直到控制权返回容器后才会提交。当方法退出时,JPA 事务就会提交。假设这里出了问题,数据库操作抛出了异常。在这种情况下,JPA 不会提交,它会回滚,异常会传播回监听容器,此时 Kafka 事务也会回滚。另一方面,如果 JPA 操作成功,但 Kafka 发布失败并抛出异常,JPA 不会提交,而是回滚,异常会传播到监听器。

在上面的代码中,如果我们不与外部事务管理器进行同步,而只是发布到 Kafka,那么我们不需要使用 @Transactional 注解,甚至可以将代码内联到 txCode 方法中,作为消费者 lambda 的一部分。

@Bean
public Consumer<PersonEvent> process() {
   return pe -> {
	  Person person = new Person();
       person.setName(pe.getName());
       PersonEvent event = new PersonEvent();
       event.setName(person.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);

   }
}

在这种情况下,我们只有容器在调用监听器时发起的 Kafka 事务。当代码通过 StreamBridge send 方法发布记录时,KafkaTemplate 会使用与初始事务相同的事务生产者工厂。

在这两种情况下,我们都是完全事务性的,事务的最终发布只进行一次。隔离级别为 read_committed 的下游消费者应该只消费一次。

Kafka Stream 和 Exactly-Once 语义

到目前为止,在这个系列中,我们还没有讨论过 Kafka Stream。最初,Kafka Stream 应用程序是 Apache Kafka 添加事务支持和 “仅一次” 语义的原因,但我们还没有谈论过它。原因是在 Kafka Stream 应用程序中实现 “仅一次” 语义是直接且几乎非常规的。正如他们所说,这是一个单一的配置选项。要了解更多关于 Kafka Stream 中 “仅一次” 语义的信息,请参阅 Confluent 的这篇博客

与普通的基于 Kafka 客户端的应用程序一样,在 Kafka Stream 的情况下,当你以消费-处理-产生(consumer-process-produce)模式产生最终输出时,“仅一次”(actly-once)保证就会发挥作用,这意味着所产生数据的任何下游消费者只要使用 read_committed 隔离级别,就会精确消耗一次数据。

Kafka Stream 配置属性 processing.guarantee 属性可在 Kafka Stream 应用程序中启用 “仅一次” 语义。你可以在 Spring Cloud Stream 中设置 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee 属性。你需要将值设为 exactly_once。默认情况下,Kafka Stream 使用 at_least_once 值。

一般在有状态的 Kafka Stream 应用程序中会发生的三个主要活动是:

  1. 记录的初始消费
  2. 通过更新 Changelog Topic 进行状态存储更新
  3. 生产数据

其模式是接收并处理记录。在此过程中,任何状态信息都会转化为状态存储,从而更新特定的更新 changelog topic。最后,出站记录会发布到另一个 Kafka Topic。如果你注意到这种模式,它看起来与我们已经见过的许多场景相似,除了状态存储部分。将 processing.guarantee 设置为 exactly_once,Kafka Stream 就能保证,如果在这些活动中出现异常或应用程序崩溃,整个单元会原子回滚,就像什么都没发生过一样。应用程序重启后,处理器会再次消耗记录、处理记录并最终发布数据。由于数据发布是在幕后以事务方式进行的,因此在数据永远发布之前,任何隔离级别为 read_committed 的下游消费者都不会消费该记录,而处理器会处理所有实现事务性所需的工作(如提交已消费记录的偏移量等),从而保证 “仅一次” 交付。

Kafka Stream 的 “仅一次” 交付保证是针对从 Kafka 相关活动的角度来看记录的端到端消费、处理和发布。当外部系统存在时,它不提供这种保证。例如,假设你的代码与外部系统有交互,如数据库插入或更新操作。在这种情况下,应用程序将自行决定如何参与事务。在这种情况下,Spring 的事务支持又派上了用场。我们不想在此重复代码。不过,正如我们在本系列中多次看到的那样,可以将与数据库交互的代码封装在一个单独的方法中,使用 @Transactional 注解对其进行注解,并提供一个适当的事务管理器,例如我们看到的 JPA 管理器。当这种方法抛出异常时,JPA 事务会回滚,异常会传播到 Kafka Stream 处理器代码,最终传播回 Kafka Stream 框架本身,然后回滚原始 Kafka 事务。值得在此再次重申的是,我们必须明白,从流拓扑中的处理器调用这些操作时,必须编写代码来处理惰性,因为 “仅一次” 只适用于整个流程,而不适用于序列中的单个读取和处理。

总结

正如我们在本文开头提到的,“仅一次” 交付(exactly-once-delivery)语义是分布式计算中的一个复杂话题。不过,有了 Kafka 原生提供的实现 “仅一次” 语义的解决方案,以及 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的 Spring 支持,在 Spring Cloud Stream Kafka 应用程序中实现 “仅一次” 交付语义就相对容易了。


参考:https://spring.io/blog/2023/10/16/apache-kafkas-exactly-once-semantics-in-spring-cloud-stream-kafka