在 Spring Cloud Stream Kafka 中与外部事务管理器(Transaction Manager)同步

在本系列教程的 上一章,我们了解了事务管理的基本知识,主要是在使用生产者启动的 Spring Cloud Stream Kafka 应用时。上文还简要了解了 Spring Cloud Stream Kafka 消费者应用如何通过适当的隔离级别来消费以事务方式生成的记录。当你与外部事务管理器(如关系数据库的事务管理器)同步时,有提到你必须使用事务来确保数据完整性。在本教程中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。

注意,实现分布式事务在实践中是非常困难的。你必须依靠两阶段提交(2PC)策略和适当的分布式事务管理器(如与 JTA 兼容的事务管理器)才能正确实现这一目标。尽管如此,大多数企业用例可能并不需要这种复杂程度,而且我们考虑的大多数用例以及人们在实践中使用的大多数用例,最好还是坚持使用非分布式事务方法,正如我们在本教程中所描述的那样。这篇文章 是 Spring 团队的 Dave Syer 博士在 2009 年发表的,对于理解分布式事务的挑战和 Spring 中推荐的替代方法,这篇文章(即使已经过去了 14 年)仍然具有现实意义。

回到我们的主题:在生产者启动和消费-处理-生产(读-处理-写)应用中使用外部事务管理器时,在 Spring Cloud Stream Kafka 应用中实现事务性。

现在,通过示例进行说明。使用 domain 对象来驱动演示,并为它们创建了伪代码。

假设消息系统处理的是 “event” domain 类型 - 让我们使用 PersonEvent

class PersonEvent {

   String name;
   String type;

   // 为简洁起见,其余部分省略
}

还需要为 Person 对象创建一个 Domain Entity:

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // 为简洁起见,其余部分省略
}

最后,我们需要为 Person domain 对象建立一个 CrudRepository

public interface PersonRepository extends CrudRepository<Person, String> {}

在生产者发起的场景中,假设在调用一个方法(例如通过 REST)时,会创建一个 Person domain 对象,将其持久化到数据库中,并通过 StreamBridge 将其作为 PersonEvent 发送到出站的 Kafka Topic。。

在消费-处理-生产场景中,假设入站 Topic 收到一个 PersonEvent,处理器从中生成一个 Person domain 对象并持久化到数据库中。最后,它会生成另一个 PersonEvent,并将其发送到出站的 Kafka Topic。

在此,我们使用了 JPA。Spring Cloud Stream 应用是 Boot 应用,你可以在应用中包含 spring-boot-starter-jpa 依赖,并包含相应的 spring.jpa.* 属性来驱动必要的自动配置。假设 Spring Boot 会为我们自动配置 JPATransactionManager

让我们把用例分解成不同的场景。

场景 1:生产者发起事务

在生产者发起的场景中,我们有两个必须以事务方式进行的操作:一个是数据库操作,另一个是 Kafka 发布操作。以下是基本思路。注意,这段代码只展示了其中涉及的核心内容。在实际中,代码几乎肯定会比这复杂得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述由生产者发起的代码是完全事务性的。在本系列教程的上一章,我们看到如果只有一个 Kafka 事务,仅添加 Transactional 注解还不行。如前所述,Transactional 注解没有事务管理器,我们需要自定义事务管理器,使用相同的底层事务资源来实现事务性。但这里的情况有所不同。我们有 Spring Boot 自动配置的 JpaTransactionManager,事务拦截器使用它来启动事务。由于我们配置了 transaction-id-prefixStreamBridge 的发送操作可以事务方式完成。不过,KafkaTemplate 会通过事务同步管理器(TransactionSynchronizationManager)将 Kafka 事务与已有的 JPA 事务同步。方法退出时,主事务首先提交,然后是同步事务,在这种情况下,同步事务就是 Kafka 事务。

该流程的顺序如下

  1. JPA 事务管理器会启动一个新的 JPA 事务。
  2. 数据库操作开始,但由于我们仍在执行方法,因此不会提交。
  3. StreamBridge send 操作会触发一个新的 Kafka 事务,并通过事务同步管理器与 JPA 事务同步。
  4. 方法退出时,首先提交 JPA 事务,然后提交 Kafka 事务。

关于在 Spring 中同步事务的说明:这听起来像是在幕后进行复杂的事务同步。然而,正如我们在本文开头所暗示的,这里并没有进行分布式事务同步,更不用说在不同事务之间进行同步的任何智能方法了。事务本身对同步一无所知。Spring TransactionSynchronizatonManager 只是协调多个事务的提交和回滚。在这种情况下同步事务在功能上类似于嵌套两个或多个 @Transactional 方法或 TransactionTempate 对象。由于 Spring 会为你进行嵌套,因此需要配置的东西更少。

场景 2:颠倒提交顺序

假设由于流程中的某些新要求,我们需要颠倒提交顺序,让 Kafka 事务先提交,而不是 JPA 事务先提交。我们该怎么做呢?我们可能会直观地想到一个解决方案,那就是显式地为 @Transactional 注解提供一个 Kafka 事务管理器,让 JPA 事务与 Kafka 事务同步,后者才是主要事务。代码如下:

@Transactional(customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我们需要提供一个自定义的 Kafka 事务管理器:

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

如果 Spring Boot 检测到一个事务管理器已经存在,它就不会配置该事务管理器,因此我们必须自己配置 JPA 事务管理器:

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我们是否成功地改变了应用事务的顺序?很遗憾,没有。它不起作用,因为 JPA 事务管理器不会让它的事务与其他事务同步,比如本例中来自主事务管理器(自定义 Kafka 事务管理器)的事务。在我们的案例中,虽然我们将自定义 Kafka 事务管理器设为主事务管理器,但在执行 Repository save 方法时,JPA 事务会自行启动和提交,而不会与主事务管理器同步。

该流程的事件顺序如下

  1. Kafka 事务管理器启动了一个新的事务,拦截器使用这个事务。
  2. 当存 Repository save 方法执行时,JpaTransactionManager 会创建一个 JPA 事务,而不会与主事务同步。
  3. JPA 事务在方法执行过程中提交。
  4. 退出方法后,拦截器将提交 Kafka 事务。

那么,如何才能逆转过来呢?有两种方法。

首先,我们可以尝试链式事务管理器。ChainedTransactionManagerSpring Data 项目中的一个事务管理器实现。你可以向 ChainedTransactionManager 指定事务管理器列表,它将按照列表中事务管理器的顺序启动事务。在退出时(即方法退出时),事务将按照事务管理器列表的相反顺序提交。

虽然这听起来是个合理的策略,但需要注意的一点是,ChainedTransactionManager 目前已被弃用,不推荐使用。弃用的原因见 Javadoc。大意是,人们通常希望 ChainedTransactionManager 是一颗神奇的银弹,可以解决所有事务问题,包括具有两阶段提交的分布式事务和其他问题,而这与事实相去甚远。ChainedTransactionManager 只能确保事务按特定顺序启动和提交。它不能保证任何事务同步,更不用说任何分布式事务协调了。假设你可以接受 ChainedTransactionManager 的限制,并希望按照我们的用例所要求的特定顺序执行事务。在这种情况下,使用该事务管理器是合理的,只要你记住你使用的是框架中一个已废弃的类即可。

让我们在场景中尝试一下 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一个名为 ChainedKafkaTransactionManager 的子类,由于父类已被弃用,该子类也已被弃用。

我们在链式事务中使用了与之前相同的自定义 KafkaTransactionManager Bean。

我们还需要像之前一样创建 JpaTransactionManager Bean,因为 Spring Boot 不会自动配置它,因为它已经检测到了自定义的 KafkaTransactionManager Bean。

添加这两个 Bean 后,创建 ChainedKafkaTransactionManager Bean:

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

有了这些后,修改事务注解:

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上述配置实现了我们想要的结果。运行此应用时,我们会如预期的那样反转事务,即 Kafka 会先提交,然后 JPA 才会提交。

以下是流程步骤

  1. TransactionInterceptor 使用自定义 ChainedKafkaTransactionManager 来启动事务。它使用 JpaTransactionManager 启动 Jpa 事务,并对 KafkaTransactionManager 执行同样的操作。
  2. 当该方法调用数据库操作时,由于它已经在 JPA 事务中运行,因此不会启动另一个事务。由于这不是一个新事务,因此不会发生提交或回滚。
  3. 接下来,该方法通过 StreamBridge 执行 Kafka 发布。我们在这里看到了与上文 JPA 相同的情况。由于已经存在一个 Kafka 事务,所以它不会启动一个新的 Kafka 事务。StreamBridge 发送操作使用的是与初始 Kafka 事务相同的事务生产者工厂。这里不会发生提交或回滚。
  4. 当方法退出时,链式事务管理器会以相反的顺序运行,从 Kafka 事务提交(或回滚)开始,然后是 JPA 事务提交。

如果你能接受链式事务管理器的限制,那么这种方法也行得通。请记住,这里没有事务同步。事务管理器在事务开始时按指定顺序应用,在提交或回滚时按相反顺序应用。如果采用这种方法,由于使用的是框架中已废弃的类,因此最好复制这些类并在项目中使用,而不是依赖框架。由于它们已被弃用,因此无法保证提供新功能和错误修复。未来的版本可能会完全放弃它们。也有可能它们永远不会被移除,而弃用状态的存在是为了阻止人们使用它们(因为人们认为它们的功能比实际功能更强大)。

如果你不想依赖框架中已废弃的类,或者不想复制这些类并在你的代码中进行维护,你可以尝试另一种方法。你可以创建两个事务方法并嵌套调用。下面是这一想法的简单实现:

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

确保嵌套调用位于不同的类中,原因我们已在本系列教程的 第 2 节 中介绍过,这与Spring中的AOP代理工作方式有关。

在这种情况下,这两个方法都是事务性的,而且是嵌套的。当事务拦截器拦截到第一个方法调用时,它会启动 JPA 事务。在执行过程中,嵌套的调用(其方法也带有 @Transactional 注解)进入。由于该 Bean 方法具有 @Transactional 注解,因此 Spring AOP 将该 Bean 封装在一个 AOP Advice 中。由于我们从不同类中的另一个 Bean 调用这个 Advice Bean,因此代理机制会正确调用 Advice Bean。另一个事务拦截器通过使用不同的事务管理器(即 KafkaTransactionManager)来启动一个新事务。当发生 Kafka 发布时,事务不会立即提交或回滚,因为事务是作为方法的一部分启动的,而提交或回滚是在方法退出时发生的。此时,控制返回到第一个方法并继续。一旦退出原始方法,JPA 事务就会通过拦截器提交。如果发布到 Kafka 的方法抛出异常,它就会回滚该事务。在这种情况下,回滚后,异常会传播回第一个事务方法(JPA 方法),该方法也会因异常而回滚其事务。

在使用这种技术时需要注意的一点是,对于嵌套方法的调用应该是第一个方法执行的最后一件事情。这是因为,如果第一个方法在成功执行了Kafka调用后无法执行一些代码,那么Kafka事务已经被提交了。第一个方法的失败不会自动回滚Kafka事务。

场景 3:消费-处理-生产

有了本系列迄今为止对事务的核心理解,让我们来看看事件驱动和流应用中的一种重要模式,即消费-处理-生产模式。在 Spring Cloud Stream 中,这种模式的实现如下所示:

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我们有一个 Spring Cloud Stream Function,它消费输入 Topic 中的 PersonEvent,然后调用函数的 lambda 表达式主体中的函数进行处理。该函数返回另一个 PersonEvent,我们将其发布到出站的 Kafka Topic 中。如果不是在事务性上下文中,我们可以将上述 run 方法内联为函数 lambda 表达式的一部分。不过,要实现事务语义,@Transactional 注解必须用在不同类中的方法上。

要使 Binder 具有事务性,请确保为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 提供有效值。

上述代码是完全事务性的吗?但实际上,它只是端到端的部分事务性代码。让我们来看看事件发生的顺序。

Binder 是事务性的,因为我们提供了 transaction-id-prefix。当消费者轮询消息监听器容器中的记录时,它会调用其 TrasactionTemplate#execute 方法中的内部监听器方法。因此,执行监听器方法(调用用户方法)的整个端到端过程都是在 KafkaTransactionManager 启动的事务中运行的。事务启动时,事务同步管理器(TransactionSynchronizationManager)会将资源(生产者)绑定到事务上。当用户方法(注解为 @Transactional 的方法)被调用时,事务拦截器会拦截该调用,让封装的 AOP Advice 来处理实际调用。因为我们有一个 JpaTransactionManager,所以事务拦截器会使用该管理器并启动一个新事务。是否要与现有事务同步取决于每个事务管理器的实现。就 JpaTransactionManager(以及许多其他类似的数据库事务管理器实现)而言,它不允许与现有事务同步,这一点我们在上文已经了解过。因此,JPA 事务是独立运行的,如上文所述。当 run 方法退出时,事务拦截器会使用 JPA 事务管理器执行提交或回滚操作。这样,JPA 事务管理器就完成了它的工作。此时,方法调用的响应将返回调用者,即 Spring Cloud Stream 基础架构。Spring Cloud Stream 中的机制会接收该响应并将其发送到 Kafka 中的出站 Topic。它使用初始事务开始时绑定的相同事务生产者。发送记录后,控制返回消息监听器容器,然后提交或回滚事务。

具体步骤如下

  1. Kafka 消费者收到记录。
  2. Spring Kafka 中的容器通过使用 TransactionTemplateexecute 方法来调用监听器。
  3. KafkaTransactionManager 启动一个新事务。
  4. 绑定 Kafka 资源(生产者)。
  5. 当它到达用户代码时,事务拦截器最终会拦截该调用并启动一个新的 JPA 事务。
  6. 然后,AOP 代理会调用实际方法。方法退出时,JpaTransactionManager 会提交或回滚。
  7. 该方法的输出会以 Spring Cloud Stream 的形式返回给调用者。
  8. 然后,使用第 4 步中相同的事务资源将响应发送到Kafka出站。
  9. 控制权返回给消息监听容器,然后 KafkaTransactionManager 进行提交或回滚操作。

那么,问题出在哪里呢?它看起来是事务性的,但实际上只是部分事务性。一开始的主要问题是,整个端到端流程超出了单个原子事务的范围,这是一个重大问题。这里有两个事务-Kafka 和 JPA,而且 JPA 和 Kafka 事务之间没有同步。如果数据库事务已提交,而 Kafka 发送失败,那么就无法回滚 JPA 事务。

我们可能会认为 ChainedTransactionManager 可以帮上忙。虽然这种直觉有一定道理,但在上述代码中却行不通。因为在调用监听器方法时容器中创建了 Kafka 事务,所以 ChainedTransactionManager 不会从提供给它的任何 Kafka 事务管理器中创建任何新的 Kafka 事务。在退出用户方法时,我们仍有一个 JPA 事务需要提交或回滚。Kafka 事务必须等到调用返回容器后才能提交或回滚。

问题在于,我们在 Spring Cloud Stream 中使用了一个 Function,该函数可使框架发布到 Kafka。在我们的案例中,任何用户指定的事务(如 JPA 事务)都会在 Spring Cloud Stream 执行 Kafka 发布之前发生。我们需要确保用户代码是向 Kafka 发布的代码,这样才能将整个事务代码视为一个单元。要做到这一点,我们应该改用 Consumer 而不是 Function,然后使用 StreamBridge API 发布到 Kafka。看看修改后的代码

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然后,我们使用与上述相同的 TxCode:

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

请注意,运行方法不会返回任何内容,但会通过 StreamBridge API 明确向外发送 Kafka Topic。

让我们来看一下这些更改后的事件顺序

  1. Kafka 消费者接收记录。
  2. Spring Kafka 中的容器通过使用 TransactionTemplateexecute 方法来调用监听器。
  3. KafkaTransactionManager 启动一个新事务。
  4. 绑定 Kafka 资源(生产者)。
  5. 当到达用户代码时,拦截器会拦截该调用,并使用 JpaTransactionManager 启动一个新事务。
  6. 调用实际的用户方法。
  7. 作为方法执行的一部分,Kafka 发送操作是通过 StreamBridge 进行的。底层 KafkaTemplate 使用步骤 4 中绑定的相同事务生产者工厂。
  8. 方法退出时,JpaTransactionManager 会提交或回滚。
  9. 最后,当 Kafka 事务提交(或回滚)时,控制权会到 TransactionTemplate#execute 方法。

请特别注意上面的第 7 步。当 KafkaTemplate 检测到已经有一个正在进行的 Kafka 事务(从步骤 3 开始)时,它不会与 JPA 事务同步,尽管 KafkaTemplate 可以这样做。现有的 Kafka 事务具有优先权,它会加入该事务。

尽管我们仍然有两个单独的事务,但从端到端的事务角度来看,这些操作是原子的。如果通过 StreamBridge 进行的 Kafka 发布操作失败,JPA 和 Kafka 事务都不会执行提交操作,而是进行回滚。同样,如果数据库操作失败,两个事务仍然会回滚。然而,存在一种可能性,即一个事务提交而另一个事务回滚,因此应用程序代码必须处理记录的去重以实现容错性。

在消费-处理-生产模式中,另一个关键部分是生产者需要将消费记录的偏移量(除了提交偏移量的消费者之外)发送给事务。正如我们在本系列教程 第一节 中看到的,Kafka 生产者 API 中有一个名为 sendOffsetToTransaction 的方法,生产者通过 OffsetMetadataConsumerGroupMetadata 为每个分区发送偏移量(当前消息的偏移量 + 1)。使用 Spring Cloud Stream 或 Spring for Apache Kafka 时,应用程序无需调用此底层操作。Spring for Apache Kafka 中的 Kafka 消息监听器容器会代表应用自动处理该操作。虽然框架会在事务提交之前调用生产者的 sendOffsetToTransaction,但向事务发送偏移和实际的消费者偏移提交都是在事务协调器提交事务时原子发生的。

通过如上的学习,我们了解了编写事务性的 Spring Cloud Stream 应用的各种选项,这些应用在消费和生产 Apache Kafka 的同时必须与外部事务性系统(如数据库)进行交互。

在本系列的下一部分,我们将看一下事务回滚(编写事务系统时的另一个关键方面),以及在开发 Spring Cloud Stream Kafka 应用时如何访问各种 Spring 组件。


参考:https://spring.io/blog/2023/10/04/synchronizing-with-external-transaction-managers-in-spring-cloud-stream