Spring Cloud AWS 3.0 简介 - SQS 整合

1、概览

Spring Cloud AWS 是一个旨在简化与 AWS 服务交互的项目。SQS(Simple Queue Service)是 AWS 的一种解决方案,用于以可扩展的方式发送和接收异步消息。

本文将带你了解针对于 Spring Cloud AWS 3.0 完全重写的 Spring Cloud AWS SQS Integration

该框架为处理 SQS 队列提供了熟悉的 Spring 抽象,如 SqsTemplate@SqsListener 注解。

本文以一个事件驱动的场景为例,介绍了如何发送和接收消息。并展示使用 Testcontainers(一种管理一次性 Docker 容器的工具)和 LocalStack(可在本地模拟类似 AWS 的环境,用于测试)设置集成测试的策略。

2、依赖

Spring Cloud AWS BOM 可确保项目之间的版本兼容。它声明了包括 Spring Boot 在内的许多依赖项的版本。

添加 Spring Cloud AWS BOM 到 pom.xml

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

我们需要的主要依赖关系是 SQS Starter,它包含项目中所有与 SQS 相关的类。SQS Integration 不依赖于 Spring Boot,可在任何标准 Java 应用中独立使用:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

对于 Spring Boot 应用(如本教程中构建的应用),我们应该添加项目的 Core Starter,因为它允许我们利用 Spring Boot 的 SQS 自动配置以及 AWS 配置(如 credentialsregion):

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

3、设置本地测试环境

在本节中,我们将使用 Testcontainers 设置 LocalStack 环境,以便在本地环境中测试代码。注意,本教程中的示例也可以直接针对 AWS 执行。

3.1、依赖

要使用 JUnit 5 运行 LocalStackTestContainers,还需要两个额外的依赖项:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

还需要 awaitility 库,以断言异步消息消费:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

3.2、配置

现在,创建一个具有管理容器逻辑的类,测试套件可以继承该类。我们将其命名为 BaseSqsIntegrationTests。对于继承该类的每个测试套件,Testcontainers 都将创建并启动一个新容器,这对于将每个套件的数据相互隔离至关重要。

@SpringBootTest 注解是初始化 Spring Context 所必需的,而 @Testcontainers 注解则将我们的 Testcontainers 注解与 JUnit 的运行时关联起来,这样容器就能在测试套件运行时启动,并在测试完成后停止:

@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
   // 在此处添加测试配置
}

现在声明 LocalStackContainer。为了让框架自动管理容器的生命周期,@Container 注解是必须的:

private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

最后,把 Spring Cloud AWS 框架用于自动配置的属性与 LocalStack 绑定。我们将在运行时获取容器端口和主机,因为 Testcontainers 会为我们提供随机端口,这非常适合并行测试。为此,可以使用 @DynamicPropertySource 注解:

@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
    registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
    registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
    registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
      .toString());
    // ...可在此处添加其他 AWS 服务端点
}

这就是我们使用 LocalStackTestcontainers 和 Spring Cloud AWS 实现 Spring Boot 测试所需的全部内容。在运行测试之前,还需要确保在本地环境中安装且运行了 Docker 引擎。

4、设置队列名称

可以用 Spring Boot 的 application.yml 来设置队列名称。

在本教程中,创建三个队列:

events:
  queues:
    user-created-by-name-queue: user_created_by_name_queue
    user-created-record-queue: user_created_record_queue
    user-created-event-type-queue: user_created_event_type_queue

创建一个 POJO 来表示这些属性:

@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {

    private String userCreatedByNameQueue;
    private String userCreatedRecordQueue;
    private String userCreatedEventTypeQueue;

    // Getter / Setter 
}

最后,需要在 @Configuration 注解的类或 Spring Application main 类中使用 @EnableConfigurationProperties 注解,让 Spring Boot 知道要在其中注入 application.yml 属性:

@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudAwsApplication.class, args);
    }
}

现在,可以在需要队列名称时注入值本身或 POJO。

默认情况下,Spring Cloud AWS SQS 会在未找到队列时创建队列,这有助于我们快速建立开发环境。在生产环境中,应用不应拥有创建队列的权限,因此如果找不到队列,它将无法启动。也可以将框架配置为在找不到队列时显式启动失败。

5、发送和接收消息

使用 Spring Cloud AWS 向 SQS 收发消息有多种方法。在此,本文介绍最常见的几种方法,使用 SqsTemplate 发送消息和使用 @SqsListener 注解接收消息。

5.1、场景

在我们场景,我们将模拟一个事件驱动的应用,它通过在本地 Repository 中保存相关信息来响应 UserCreatedEvent

创建 User Entity:

public record User(String id, String name, String email) {
}

创建一个简单的基于内存的 UserRepository

@Repository
public class UserRepository {

    private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();

    public void save(User userToSave) {
        persistedUsers.put(userToSave.id(), userToSave);
    }

    public Optional<User> findById(String userId) {
        return Optional.ofNullable(persistedUsers.get(userId));
    }

    public Optional<User> findByName(String name) {
        return persistedUsers.values().stream()
          .filter(user -> user.name().equals(name))
          .findFirst();
    }
}

最后,创建一个 UserCreatedEvent Java Record 类:

public record UserCreatedEvent(String id, String username, String email) {
}

5.2、设置

为了测试我们的场景,需要创建一个 SpringCloudAwsSQSLiveTest 类,该类将继承我们之前创建的 BaseSqsIntegrationTest 。我们将自动注入三个依赖:框架自动配置的 SqsTemplateUserRepository(以便断言消息处理是否成功)以及包含队列名称的 EventQueuesProperties POJO:

public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {

    private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private EventQueuesProperties eventQueuesProperties;

   // ...
}

创建一个包含监听器的 UserEventListeners 类,并将其声明为 Spring @Component

@Component
public class UserEventListeners {

    private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);

    public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";

    private final UserRepository userRepository;

    public UserEventListeners(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    // 在此添加监听器
}

5.3、String 消息

第一个示例,我们将发送一条 String 消息,在监听器中接收该消息,并将其持久化到 Repository 中。然后,检索 Repository,以确保应用正确地持久化数据。

首先,在测试类中创建一个发送消息的测试:

@Test
void givenAStringPayload_whenSend_shouldReceive() {
    // given
    var userName = "Albert";

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
      .payload(userName));
    logger.info("Message sent with payload {}", userName);

    // then
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findByName(userName)
        .isPresent());
}

执行测试,我们应该可以在控制台看到类似如下的日志:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert

注意,测试失败的原因是我们还没有该队列的监听器。

让我们在监听器类中设置监听器,以便从队列中接收消息,并使测试通过:

@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
    logger.info("Received message: {}", username);
    userRepository.save(new User(UUID.randomUUID()
      .toString(), username, null));
}

现在,运行测试,应该能在日志中看到结果:

INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert

测试通过。

注意,我们使用了 Spring 的属性解析功能从之前创建的 application.yml 中获取队列名称。

5.4、POJO 和 Record 消息

我们已经发送和接收了 String,现在让我们用 Java Record(之前创建的 UserCreatedEvent)来设置一个场景。

首先,编写失败测试:

@Test
void givenARecordPayload_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
      .payload(payload));

    // then
    logger.info("Message sent with payload: {}", payload);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

在测试失败之前,你应该能看到类似如下的日志:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, email=john@baeldung.com]

现在,创建相应的监听器,使测试通过:

@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
    logger.info("Received message: {}", event);
    userRepository.save(new User(event.id(), event.username(), event.email()));
}

输出结果如下,说明消息已收到,测试通过:

INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, email=john@baeldung.com]

框架会自动配置 Spring Context 中可用的任何 ObjectMapper Bean,以处理消息的序列化和反序列化。我们可以配置自己的 ObjectMapper,并以多种方式自定义序列化,但这超出了本教程的范围,这里不多解释。

5.5、Spring Message 和 Header

最后一种场景,发送带有自定义 Header 的 Record,并以 Spring Message 实例的形式接收消息,同时在方法签名中添加自定义 Header 和标准 SQS Header。框架会自动将所有 SQS 消息属性(message attributes) 转换为消息 Header,包括用户提供的任何属性。

首先,创建一个失败的测试:

@Test
void givenCustomHeaders_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");
    var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
      .payload(payload)
      .headers(headers));

    // then
    logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

测试失败,生成的日志和上文类似:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest  : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com] and custom headers: {eventType=UserCreatedEvent}

现在,添加相应的监听器,使测试通过:

@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
    @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
    logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
    UserCreatedEvent payload = message.getPayload();
    userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}

重新运行测试,输出日志如下,表明测试成功:

INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com], headers=...

在这个示例中,我们接收到一条消息,其 Payload 是反序列化后的 UserCreatedEvent Record 和两个 Header。为确保整个项目的一致性,应使用 SqsHeader 类常量来检索 SQS 标准 Header。

6、总结

本文使用了一个事件驱动场景,通过不同的示例介绍了如何使用 Spring Cloud AWS SQS 3.0 发送和接收消息。


Ref:https://www.baeldung.com/java-spring-cloud-aws-v3-intro