在 Spring Cloud Stream Kafka 中与外部事务管理器(Transaction Manager)同步

在本系列教程的 上一章,我们了解了事务管理的基本知识,主要是在使用生产者启动的 Spring Cloud Stream Kafka 应用时。上文还简要了解了 Spring Cloud Stream Kafka 消费者应用如何通过适当的隔离级别来消费以事务方式生成的记录。当你与外部事务管理器(如关系数据库的事务管理器)同步时,有提到你必须使用事务来确保数据完整性。在本教程中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。 注意,实现分布式事务在实践中是非常困难的。你必须依靠两阶段提交(2PC)策略和适当的分布式事务管理器(如与 JTA 兼容的事务管理器)才能正确实现这一目标。尽管如此,大多数企业用例可能并不需要这种复杂程度,而且我们考虑的大多数用例以及人们在实践中使用的大多数用例,最好还是坚持使用非分布式事务方法,正如我们在本教程中所描述的那样。这篇文章 是 Spring 团队的 Dave Syer 博士在 2009 年发表的,对于理解分布式事务的挑战和 Spring 中推荐的替代方法,这篇文章(即使已经过去了 14 年)仍然具有现实意义。 回到我们的主题:在生产者启动和消费-处理-生产(读-处理-写)应用中使用外部事务管理器时,在 Spring Cloud Stream Kafka 应用中实现事务性。 现在,通过示例进行说明。使用 domain 对象来驱动演示,并为它们创建了伪代码。 假设消息系统处理的是 “event” domain 类型 - 让我们使用 PersonEvent: class PersonEvent { String name; String type; // 为简洁起见,其余部分省略 } 还需要为 Person 对象创建一个 Domain Entity: @Entity @Table(name = "person") public class Person { @Id @GeneratedValue(strategy = GenerationType.

Spring Cloud Stream Kafka 中由生产者发起的事务

本文是系列教程的第 2 部分,在这一部分中,我们将通过 Spring Cloud Stream 和 Apache Kafka 详细介绍事务。在 上一节 中,我们了解了事务的基本概念。本文将深入了解一些实现细节。 在本文中,我们主要从生产者的角度来了解事务如何与 Spring Cloud Stream 和 Apache Kafka 配合使用。 Spring Cloud Stream 中的生产者 在深入了解生产者发起的事务之前,我们先来了解一下简单生产者的基本知识。在 Spring Cloud Stream 中,有几种编写生产者(在消息传递领域也称为发布者)的方法。如果你的用例需要定时生成数据,你可以编写一个 java.util.function.Supplier 方法,如下所示。 @Bean public Supplier<Pojo> mySupplier() { return () -> { new Pojo(); }; } 如代码所示,将上述 Supplier 作为 Spring Bean 提供时,Spring Cloud Stream 会将其视为发布者,由于我们在 Apache Kafka 的上下文中,因此它会将 POJO 记录发送到 Kafka Topic。 默认情况下,Spring Cloud Stream 每秒调用一次 Supplier,但你可以通过配置更改该计划。更多详情,请参阅 参考文档。 如果你不想轮询 Supplier,但又想控制其发布频率,该怎么办?Spring Cloud Stream 通过名为 StreamBridge 的开箱即用实现 StreamOperations API 提供了一种便捷的方法。下面是一个示例。

Spring Cloud Stream Kafka 中的事务简介

本系列教程重点介绍如何在 Spring Cloud Stream Kafka 应用中处理事务。涵盖了使用 Spring Cloud Stream 和 Apache Kafka 开发事务应用的许多底层细节。 基本组成 Spring Cloud Stream Kafka 应用中事务的基础支持主要来自于 Apache Kafka 本身和 Spring for Apache Kafka 库。然而,这个博客系列是关于如何在 Spring Cloud Stream 中使用这种支持。如果你熟悉 Apache Kafka 中的事务原理,以及 Spring for Apache Kafka 是如何以 Spring 的方式使用它的,那么这个系列将感觉像是自己的领域。 Apache Kafka 提供了基础事务支持,而 Spring for Apache Kafka(又称 Spring Kafka)库则在 Spring 侧扩展了这一支持,使 Spring 开发人员可以更自然地依赖 Spring Framework 中提供的传统事务支持来使用它。Spring Cloud Stream 中的 Kafka Binder(绑定器)进一步加强了 Spring 对 Apache Kafka 的支持,使在 Spring Cloud Stream Kafka 应用中使用相同的支持成为可能。在本系列教程的第一部分中,将简要介绍 Kafka 事务、一些有助于依赖事务的用例分析,以及 Apache Kafka 和 Spring 生态系统中的事务构建模块。

Spring Lifecycle 和 SmartLifecycle 的区别

当我们想在 Spring 容器启动或者关闭的时候,做一些初始化操作或者对象销毁操作,我们可以怎么做? 注意我这里说的是容器启动或者关闭的时候,不是某一个 Bean 初始化或者销毁的时候! 1、Lifecycle 对于上面提到的问题,如果你稍微研究过 Spring,应该是了解其里边有一个 Lifecycle 接口,通过这个接口,我们可以在 Spring 容器启动或者关闭的时候,做一些自己需要的事情。 我们先来看下 Lifecycle 接口: public interface Lifecycle { void start(); void stop(); boolean isRunning(); } 这个接口一共就三个方法: start:启动组件,该方法在执行之前,先调用 isRunning 方法判断组件是否已经启动了,如果已经启动了,就不重复启动了。 stop:停止组件,该方法在执行之前,先调用 isRunning 方法判断组件是否已经停止运行了,如果已经停止运行了,就不再重复停止了。 isRunning:这个是返回组件是否已经处于运行状态了,对于容器来说,只有当容器中的所有适用组件都处于运行状态时,这个方法返回 true,否则返回 false。 如果我们想自定义一个 Lifecycle,方式如下: @Component public class MyLifeCycle implements Lifecycle { private volatile boolean running; @Override public void start() { running = true; System.out.println("start"); } @Override public void stop() { running = false; System.out.println("stop"); } @Override public boolean isRunning() { return running; } } 需要自定义一个 running 变量,该变量用来描述当前组件是否处于运行/停止状态,因为系统在调用 start 和 stop 方法的时候,都会先调用 isRunning 方法,用以确认是否需要真的调用 start/stop 方法。

Spring Boot 修改 Redis Value 但保留其过期时间(TTL)

在 Spring Boot 中使用 Redis 时,你一定需要过这种需求:更新某个 Redis 的 Value 值,但是不修改它的 TTL(Time To Live),也就是过期时间。 例如:使用 Redis 存储用户的 Session,过期时间为半个小时。用户的每次访问,我们都需要更新用户 Session 的 Value 值,表示用户的最后一次访问时间。但是又不能修改用户 Session 的过期时间。 有 2 种方式可以实现。 先获取过期时间,再修改 这种方式最简单,也是最容易想到。在执行修改前,先获取到这个 Key 的剩余过期时间。然后通过 SET 命令执行修改,也就是完全覆盖这个 Key / Value,同时指定剩余的过期时间。 @Autowired StringRedisTemplate stringRedisTemplate; @Test public void test() { // session key String key = "session::01"; // 先获取 key 的过期时间,单位是秒 Long expire = this.stringRedisTemplate.getExpire(key); if (expire != null && expire > 0) { // 重新设置 key,并且指定过期时间 this.

测试 Spring JMS

1、概览 在本文中,我们将创建一个简单的 Spring 应用,用于连接到 ActiveMQ 并发送和接收消息。我们将重点关注测试这个应用以及测试 Spring JMS 的不同方法。 Spring JMS 是 Spring 框架提供的一个模块,用于支持与 Java 消息服务(Java Message Service,JMS)提供者的集成。JMS是一种用于在分布式系统中发送、接收和传递消息的标准 API。 2、应用设置 首先,创建一个用于测试的基本 Spring 应用。添加必要的依赖,并实现消息处理。 2.1、依赖 在 pom.xml 中添加所需的依赖。我们需要 Spring JMS 来监听 JMS 消息。我们将在部分测试中使用 ActiveMQ-Junit 启动嵌入式 ActiveMQ 实例,并在其他测试中使用 TestContainers 运行 ActiveMQ Docker 容器: <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq.tooling</groupId> <artifactId>activemq-junit</artifactId> <version>5.16.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <version>1.17.3</version> <scope>test</scope> </dependency> 2.2、应用代码 现在,创建一个可以监听消息的 Spring 应用: @ComponentScan public class JmsApplication { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(JmsApplication.

@Spy 和 @SpyBean

1、简介 本文将带你了解 @Spy 和 @SpyBean 之间的区别和用法。 2、基本应用 本文中,我们使用一个简单的订单应用,其中包括一个用于创建订单的订单服务,以及一个用于在处理订单时发出通知的通知服务。 OrderService 有一个 save() 方法,用于接收 Order 对象,使用 OrderRepository 保存该对象,并调用 NotificationService: @Service public class OrderService { public final OrderRepository orderRepository; public final NotificationService notificationService; public OrderService(OrderRepository orderRepository, NotificationService notificationService) { this.orderRepository = orderRepository; this.notificationService = notificationService; } public Order save(Order order) { order = orderRepository.save(order); notificationService.notify(order); if(!notificationService.raiseAlert(order)){ throw new RuntimeException("Alert not raised"); } return order; } } 为简单起见,我们假设 notify() 方法仅在日志中输出订单信息。实际上,它可能涉及更复杂的操作,例如通过队列向下游应用发送电子邮件或消息。 我们还假设,每个创建的订单都必须通过调用 ExternalAlertService 来接收警报(Alert),如果警报成功,则返回 true,如果 OrderService 没有发出警报,则会失败:

Spring 6 中的 RSocket 接口

1、概览 本文将带你了解如何在 Spring 6 中使用 RSocket。 随着 Spring 6 引入声明式 RSocket 客户端,使用 RSocket 变得更加简单。这一功能消除了重复的模板代码,使开发人员能够更高效地使用 RSocket。 2、Maven 依赖 首先,创建 Spring Boot 项目,并在 pom.xml 文件中添加 spring-boot-starter-rsocket 依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> <version>3.1.4</version> </dependency> 3、创建 RSocket Server 首先,创建一个 responder(应答器),通过 Controller 来管理传入的请求: @MessageMapping("MyDestination") public Mono<String> message(Mono<String> input) { return input.doOnNext(msg -> System.out.println("Request is:" + msg + ",Request!")) .map(msg -> msg + ",Response!"); } 接着,在 application.properties 文件中添加以下属性,以在 7000 端口启动服务( MyDestination): spring.rsocket.server.port=7000 4、客户端 现在,开发客户端。为了简单起见,我们在同一个项目中创建客户端,但将其放在一个单独的包中。实际开发中,它们应该放在一个单独的项目中。 创建客户端接口: public interface MessageClient { @RSocketExchange("MyDestination") Mono<String> sendMessage(Mono<String> input); } 在使用客户端接口时,通过 @RSocketExchange 来指定 RSocket 端点。基本上,这意味着我们需要一些信息来建立端点路径。可以在接口级别上通过分配一个共享路径来实现。这非常简单,帮助我们知道要使用哪个端点。

SpringRunner 和 @SpringBootTest

1、概览 无论是单元测试还是集成测试,测试对于任何应用程序都至关重要。SpringRunner 和 SpringBootTest 类是运行集成测试的基础。 在本教程中,我们将了解 SpringRunner 和 @SpringBootTest 的用法,以及它们之间的区别。 2、SpringRunner SpringRunner 是 SpringJUnit4ClassRunner 类的别名,可用于基于 JUnit4 的测试类。它加载 Spring TestContext,通过 Spring TestContext,Spring Bean 和 Configuration 可与 JUnit 注解一起使用。需要 JUnit 4.12 或更高版本才能使用它。 通过 @RunWith(SpringRunner.class) 注解测试类来使用此功能: @RunWith(SpringRunner.class) public class SampleIntegrationTest { @Test public void test() { // } } 3、SpringBootTest SpringBootTest 是 SpringRunner 的替代品,可与 JUnit5 配合使用。它还用于运行集成测试和加载 Spring TestContext。 它的功能非常丰富,可通过注解参数提供多种配置。它支持各种 Web 环境模式,如 MOCK、RANDOM_PORT、DEFINED_PORT 和 NONE。我们可以通过注解传递 application properties,在测试运行之前将其注入到 Spring Environment 中。 @SpringBootTest( properties = {"user.name=test_user"}, webEnvironment = MOCK) public class SampleIntegrationTest { @Test public void test() { // } } 要运行集成测试,必须在类级别添加注解 @SpringBootTest。

使用 Spring Boot 构建 GraphQL API

GraphQL 是一种用于 API 的查询语言和运行时,它允许 API 消费者精确获取所需的信息,而不是服务器完全控制响应内容。某些 REST API 实现需要从多个 URL 加载资源的引用,而 GraphQL API 可以在单个响应中跟踪相关对象之间的引用并返回它们。 本教程逐步演示了如何使用 Spring Boot 和 Spring for GraphQL 构建一个 GraphQL API,用于查询 Neo4j 数据库中相关公司、人员和属性的示例数据集。它还演示了如何使用 Next.js 和 MUI Datagrid 构建一个 React 客户端来消费该 API。客户端和服务器都使用 Auth0 进行认证、授权,服务器使用 Okta Spring Boot Starter,客户端使用 Auth0 React SDK。 如果你想跳过所有步骤,直接运行程序,那么你可以以按照 GitHub Repository 中的 README 说明进行操作。 本文所使用的工具、服务如下: Node.js v18.16.1 npm 9.5.1 Java OpenJDK 17 Docker 24.0.2 Auth0 account Auth0 CLI 1.0.0 HTTPie 3.2.2 Next.js 13.4.19 使用 Spring for GraphQL 构建 GraphQL API 资源服务器(Resource Server)是一个 Spring Boot Web 应用,使用 Spring for GraphQL 暴露了一个 GraphQL API。该 API 允许使用 Spring Data Neo4j 查询 Neo4j 数据库,其中包含公司及其相关所有者和属性的信息。数据来自 Neo4j 用例示例。