1、概览 本文将带你了解如何在 Spring Boot 应用中动态地启动和停止 Kafka Listener。
2、依赖 首先,添加 spring-kafka 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.1.2</version> </dependency> 3、配置 Kafka 消费者 生产者是向 Kafka Topic 发布(写入)事件的应用。
在本教程中,我们使用单元测试来模拟生产者向 Kafka Topic 发送事件。订阅 Topic 并处理事件流的消费者由应用中的 Listener(监听器)表示。该 Listener 被配置为处理来自 Kafka 的传入消息。
通过 KafkaConsumerConfig 类来配置 Kafka 消费者,其中包括 Kafka broker 的地址、消费者组 ID 以及 Key 和 Value 的反序列化器:
@Bean public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.
本文将带你了解什么是虚拟线程、虚拟线程是如何提高应用吞吐量的,以及如何为 Spring Boot 应用启用虚拟线程。
并发编程的演化 线程 总所周知,线程(Thread)是计算机中的最小执行单元,由操作系统直接进行调度,每个线程都有自己的执行路径和执行状态,可以独立地运行和并发执行多个任务。
线程是一种重量级的资源,线程的创建、销毁以及在多个线程之间切换都需要耗费 CPU 时间,一个系统可以同时创建、调度的线程数量有限。所以,现在应用基本上都会使用 线程池 来解决这个问题,通过池化线程,可以减少线程频繁 创建 和 销毁 的成本。
例如,Servlet 容器(Tomcat、Undertow、Jetty)的并发模型就是通过线程池,为每一个请求分配一个线程池中的线程进行处理。但是,一旦涉及到阻塞操作(IO、网络请求),当前线程就会被挂起进入等待状态,这个线程就不能去执行其他任务。这就导致了,使用传统线程池并发模型的服务器能同时处理的请求有限。
而,当代 Web 应用基本上都是 IO 密集形应用,请求中执行的业务往往涉及到与数据库进行交互、调用远程服务(Socket IO),本地磁盘文件读写等等,因此使用阻塞式线程是非常低效的。
异步非阻塞编程 为了解决传统线程在执行 IO 操作时由于阻塞导致的低效,于是,开始有了一种 响应式异步非阻塞 的编程模型。
在 Java 界,这类优秀的框架很多,如 Netty、WebFlux、Vert.x 等等。它们都提倡一个东西,那就是:异步非阻塞,只要是涉及到 IO、阻塞的地方,当前线程不会等待操作执行完毕,会立即返回去执行其他可执行的任务。这时候,就需要通过监听器(Listener),或者回调(Callback)来获得操作最终的执行结果。
以 Netty 为例,伪代码如下:
Channel channel = ...; // 异步写入数据到 Socket channel.write("Hello").addListener(future -> { if(future.isSuccess()) { // 写入成功 } else { // 写入失败 } }); 其中 channel.write("Hello") 用于向 Socket 写入数据,这就是典型的阻塞操作,在传统阻塞式编程中,该方法就会阻塞,直到数据被完全写入到 Socket 中。但是,在 Netty 中,在 write 方法执行后线程就会立即返回一个 ChannelFuture 对象,不会等待写入完成,因此这个线程仍然可以继续执行其他可执行的任务。
我使用 Java 很多年了,我非常喜欢 Java 及其生态系统。在 Java 生态系统中,Spring Boot 是我构建 Java 应用的首选框架。
前不久,我在一个项目中使用了 Golang,起初我对它的感觉褒贬不一。但用得越多,就越喜欢它。
每当我尝试学习一种新的语言或框架时,我都会尝试将新框架/语言的概念映射到我已经熟悉的框架/语言中。这有助于我更快地理解新框架/语言的生态系统。
学习任何新知识的最好方法就是用它来构建一些东西。因此,在本文中,我将带你了解如何使用 Go 构建一个 REST API,包括配置管理、日志记录、数据库访问等各个方面。
本文并不会涉及到 Golang 的基础知识,如如何声明变量、循环、函数等。
使用的库 Go 没有类似 Spring Boot 的框架。通常,Go 开发人员喜欢使用标准库,只添加必要的库来构建应用。
本文将会使用到以下库来在 Go 中构建一个 REST API:
Gin Web Framework - Web 框架 Viper - 配置库 zap - 日志库 pgx - Go 的 PostgreSQL 驱动程序和工具包 golang-migrate - 数据迁移 安装 Go 和工具 你可以从 https://go.dev/doc/install 下载并安装 Go。安装完成后,将 Go bin 目录添加到 PATH 环境变量中。
export GOPATH=$HOME/go export PATH="$PATH:$GOPATH/bin" 你可以使用 VS Code、IntelliJ IDEA Ultimate(使用 Go 插件)、GoLand 或任何其他 IDE 进行 Go 开发。
本文将带你了解如何使用 Spring Boot 和 Spring Kafka 管理 Kafka 消费者偏移量(Offset)。
在之的一篇文章中,主要说明了应用处理 Kafka 消息的方式可能会影响系统的整体性能,并没有考虑消费者端的消息重复或消息丢失等问题。本文将会介绍这些话题。
1、源码 本文中的源码托管在 GitHub,你可以克隆它到本地,然后按照说明中的步骤操作即可。
2、简介 在开始之前,首先要说明一些与使用 Spring Kafka 提交偏移量有关的重要事项。首先,默认情况下,Spring Kafka 会将消费者的 enable.auto.commit 属性设置为 false。这意味着提交偏移量的责任在于框架,而非 Kafka。当然,我们可以通过将该属性设置为 true 来改变默认行为。顺便说一句,这也是 Spring Kafka 2.3 之前的默认做法。
禁用了 Kafka 自动提交(Auto Commit)后,我们就可以利用 Spring Kafka 提供的 7 种不同的提交策略。本文不会分析所有策略,只分析最重要的几种。默认策略是 BATCH。为了设置不同的策略,需要覆盖 AckMode,例如在 Spring Boot application.properties 中设置 spring.kafka.listener.ack-mode 属性的值。
首先来看看 BATCH 模式。
3、Spring Boot Kafka 应用示例 为了测试使用 Spring Kafka 进行的偏移提交,我们将创建两个简单的应用:producer (生产者)和 consumer(消费者)。生产者向 Topic 发送规定数量的消息,而消费者接收并处理这些消息。下面是生产者 @RestController 的实现。它允许我们按需向 transactions Topic 发送指定数量的消息:
@RestController public class TransactionsController { private static final Logger LOG = LoggerFactory .
本文将带你了解如何为在 Kubernetes 上运行的 Spring Boot 应用配置 SSL 证书的热重载。我们将使用 Spring Boot 3.1 和 3.2 版本中引入的两个功能。第一个功能允许我们利用 SSL Bundle 在服务器端和客户端配置和使用自定义 SSL 配置。第二个功能使得在 Spring Boot 应用的嵌入式 Web 服务器中轻松进行 SSL 证书和密钥的热重载。
为了在 Kubernetes 上生成 SSL 证书,我们将使用 cert-manager。“cert-manager” 可以在指定期限后轮换证书,并将其保存为 Kubernetes Secret。之前的文章中介绍了如何在 Secret 更新时自动重启 Pod 的类似方案。我们使用 Stakater Reloader 工具在新版本的 Secret 上自动重启 pod。不过,这次我们使用 Spring Boot 新特性来避免重新启动应用(Pod)。
源码 你也可以克隆我的源代码,亲自尝试一下。首先克隆我的 GitHub 仓库。然后切换到 ssl 目录。你会发现两个 Spring Boot 应用:secure-callme-bundle 和 secure-caller-bundle。之后,你只需按照说明操作即可。
工作原理 在介绍技术细节之前,首先来了解我们的应用架构和面临的挑战。我们需要设计一个解决方案,在 Kubernetes 上运行的服务之间实现 SSL/TLS 通信。这个解决方案必须考虑到证书重载的情况。此外,服务器和客户端必须同时进行重载,以避免通信中出现错误。在服务器端,使用嵌入式 Tomcat 服务器。在客户端应用中,使用 Spring RestTemplate 对象。
⚠️ 注意 此版本升级到 Hibernate 6.4.4.Final。虽然它包含了许多有价值的错误修复,但在原生镜像(Native Image)中无法正常工作。如果你正在使用 GraalVM,则应在 pom.xml 中使用 hibernate.version 属性将 Hibernate 暂时降级到 6.4.2.Final。
🐞 Bug 修复 如果路径中包含空格,无法解析嵌套的 JAR URL #39675 当使用较长的镜像名称且标记包含非法字符时,镜像构建会运行很长时间 #39638 Banner 打印不遵守设置的字符集 #39621 “micrometer.observations.” 配置属性应为 “management.observations.” #39600 配置类解析过程中的元数据读取使用默认资源加载器,而不是应用的资源加载器 #39598 当将一些 Gson 属性(包括 spring.gson.disable-html-escaping)设置为 false 时,它们的行为不正确 #39524 当配置属性绑定使用转换器从属性值创建一个 Map 时,属性占位符不会被解析 #39515 Gradle 插件允许使用 Gradle 7.4,但文档和测试的最低版本是 7.5 #39513 WebFlux 自动配置应仅在启用虚拟线程时配置 blocking executor #39469 TestcontainersPropertySource 断言有错别字 #39449 缺少参数时,Webflux actuator 端点的响应为 500 #39444 使用 non-shaded Pulsar 客户端和配置身份验证参数时出现 NoSuchMethod 错误 #39389 Jetty GracefulShutdown 会写入 System.
在应用中把 Redis 当成消息队列来使用已经屡见不鲜了。我想主要原因是当代应用十有八九都会用到 Redis,因此不用再引入其他消息队列系统。而且 Redis 提供了好几种实现消息队列的方法,用起来也简单。
使用 Redis 实现消息队列的几种方式 Redis 提供了多种方式来实现消息队列。
Pub/Sub 订阅发布模式,发布者把消息发布到某个 Channel,该 Channel 的所有订阅者都会收到消息。但是这种方式最大的问题是 发布出去的消息,如果没有被监听消费,或者消费过程中宕机,那么消息就会永远丢失。适合用于临时通知之类的场景,对于需要保证数据不丢失的场景不能使用这种方式。
List List 是 Redis 提供的一种数据类型,底层是链表,可以用来实现队列、栈。
Stream Stream 是一个由 Redis 5 引入的,功能完善的消息队列。想必也是 Redis 官方团队看到太多人拿 Redis 当消息队列使,于是干脆就在 Redis 上设计出一个类似于 Kafka 的消息队列。
Steam 支持消费组消费,一条消息只能被消费组中的其中一个消费者消费。支持 消息确认、支持 回溯消费 还支持把未 ACK(确认)的消息转移给其他消费者进行重新消费,在进行转移的时候还会累计消息的转移次数,当次数达到一定阈值还没消费成功,就可以放入死信队列。
这也是 Redis 种最复杂的一种数据类型。如果你真的到了需要使用 Redis Steam 作为消息队列的地步,那不如直接使用 RabbitMQ 等更加成熟且稳定的消息队列系统。
使用 List 实现可靠的消息队列 目前来说,这是用得最多的一种方式,适用于大多数简单的消息队列应用场景。List 类型有很多指令,但是作为消息队列来说用到的只有几个个:
LPUSH key element [element ...]
把元素插入到 List 的首部,如果 List 不存在,会自动创建。
BRPOPLPUSH source destination timeout
移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。
Spring Boot 3.2.2 版本的详细更新内容如下:
🐞 Bug 修复 SSLBundle 实现未提供有用的 toString() 结果 #39167 JarEntry.getComment() 从 NestedJarFile 实例返回不正确的结果 #39166 在 server.ssl 属性中混合 PEM 和 JKS 类型的证书不起作用 #39158 仅仅在 classpath 上引入 AspectJ 和 Micrometer 并不足以启用对 Micrometer observation 注解的支持 #39128 当映射到 / 时,无法访问未使用选择器(Selector)操作的 Actuator 端点。#39122 由于缺少 Authentication Manager,使用 WebFlux、Security 和 Actuator 的 Spring Boot 3.2 应用可能无法启动 #39096 management.observations.http.server.requests.name 不再生效 #39083 spring.rabbitmq.listener.stream.auto-startup 属性不起作用 #39078 PatternParseException 的日志信息中的错误标记位置不对 #39075 server.jetty.max-connections 配置无效 #39052 依赖于初始的 CharSequence 到 String 转换的 @ConfigurationPropertiesBinding 转换器不再起作用 #39051 在新的加载器实现中无法解析 Manifest attributes #38996 来自日志系统初始化的 Throwable 可能导致应用在启动时默默地失败 #38963 使用 Jetty 时,IO 操作和延迟调度的空闲超时不能设置为小于 30000ms #38960 当 jar 放在 WSL 网络驱动器上时,spring-boot-maven-plugin repackage uber jar 执行失败 #38956 Oracle OJDBC BOM 版本被标记为不适合生产使用 #38943 使用 jOOQ 且未设置 spring.
1、概览 Spring 提供了自动配置功能,可以用它来绑定组件、配置 Bean 以及从属性源中设置值。
当我们不想硬编码值,而希望使用 properties 文件或系统环境提供值时,@Value 注解就非常有用。
本文将带你了解如何通过 Spring 自动配置将属性值映射到 Enum 实例,不区分大小写。
2、Converter<F,T> Spring 使用 Converter 将 @Value 注解中的 String 值映射到所需的类型。一个专用的 BeanPostProcessor 会遍历所有组件,并检查它们是否需要额外的配置或注入。然后,找到一个合适的 Converter,并将源 Converter 中的数据绑定目标。Spring 提供了一个内置的 String 到 Enum 类型的 Converter,让我们来详细了解一下。
2.1、LenientToEnumConverter 顾名思义,该 Converter 在转换过程中可以自由解释数据。最初,它假定提供的数值是正确的:
@Override public E convert(T source) { String value = source.toString().trim(); if (value.isEmpty()) { return null; } try { return (E) Enum.valueOf(this.enumType, value); } catch (Exception ex) { return findEnum(value); } } 不过,如果无法将源映射到 Enum,它就会尝试另一种方法。它会获取 Enum 和 value 的规范名称:
本文将带你了解如何在 Spring Boot 应用中利用 Java Record 来提高其效率和可读性。
Java Record 是什么? Java Record 是一种专为保存不可变数据而设计的类。它们自动提供 equals()、hashCode() 和 toString() 等方法的实现,大大减少了模板代码。因此,它们非常适合在 Spring Boot 应用中创建数据传输对象(DTO)、实体和其他模型类。
Java Record 示例 举一个简单的例子,用 Java Record 来表示一个 Person。
public record Person(String name, int age) {} 在本例中,Person 是一个 Record,包含两个字段:name 和 age。Java 自动为该 Record 提供了以下内容:
一个 public 构造函数:Person(String name, int age) 属性的 public Getter 方法:name() 和 age() 实现了 equals()、hashCode() 和 toString() 方法 使用 Getter 方法 Record 的主要特点之一是提供了用于访问字段的隐式 Getter 方法。这些方法以字段本身命名。
创建 Person 实例并使用其 Getter 方法: