Spring Cloud AWS SQS v3 中的消息确认

1、概览

消息确认是 MQ 系统中的一种标准机制,它向 Message Broker 发出信号,表明消息已被消费,不应再次传递。在亚马逊的 SQS(Simple Queue Servic)中,确认是通过删除队列中的信息来执行的。

本文将带你了解 Spring Cloud AWS SQS v3 中开箱即提供的三种确认模式: ON_SUCCESSMANUALALWAYS

本文将利用 Spring Cloud AWS SQS V3 入门文章 中的环境和测试设置,使用事件驱动场景来说明用例。

2、依赖

首先,导入 Spring Cloud AWS BOM,以确保 pom.xml 中的所有依赖项相互兼容:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.1.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

还要添加 CoreSQS starter 依赖:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

最后,添加测试所需的依赖,即带有 JUnit 5LocalStackTestContainers、用于验证异步消息消费的 awaitility 库,以及用于处理断言的 AssertJ

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.assertj</groupId>
    <artifactId>assertj-core</artifactId>
    <scope>test</scope>
</dependency>

3、设置本地测试环境

首先,使用 Testcontainers 配置一个本地测试的 LocalStack 环境:

@Testcontainers
public class BaseSqsLiveTest {

    private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

    @Container
    static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
        registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
        registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
          .toString());
    }
}

这种设置使测试变得简单且可重复,本教程中的代码也可直接用于 AWS。

4、设置队列名称

默认情况下,Spring Cloud AWS SQS 会自动创建任何 @SqsListener 注解方法中指定的队列。首先,在 application.yaml 中定义队列名称:

events:
  queues:
    order-processing-retry-queue: order_processing_retry_queue
    order-processing-async-queue: order_processing_async_queue
    order-processing-no-retries-queue: order_processing_no_retries_queue
  acknowledgment:
    order-processing-no-retries-queue: ALWAYS

其中一个监听器定义了 acknowledgment 属性: ALWAYS

还可以添加一些 product.id,以便在整个示例中使用:

product:
  id:
    smartphone: 123e4567-e89b-12d3-a456-426614174000
    wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
    laptop: 123e4567-e89b-12d3-a456-426614174002
    tablet: 123e4567-e89b-12d3-a456-426614174004

为了在应用中以 POJO 的形式获取这些属性,需要创建两个 @ConfigurationProperties 类,其中一个用于队列:

@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {

    private String orderProcessingRetryQueue;

    private String orderProcessingAsyncQueue;

    private String orderProcessingNoRetriesQueue;

    // getter 和  setter
}

另一个用于 product:

@ConfigurationProperties("product.id")
public class ProductIdProperties {

    private UUID smartphone;

    private UUID wirelessHeadphones;

    private UUID laptop;

    // getter 和  setter
}

最后,使用 @EnableConfigurationProperties@Configuration 类中启用配置属性:

@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}

5、处理成功后确认消息

@SqsListener 的默认确认模式是 ON_SUCCESS。在这种模式下,如果监听器方法执行完毕而没有抛出异常,消息就会被确认。

为了说明这种行为,创建一个简单的监听器,它将接收 OrderCreatedEvent(订单创建事件),检查 InventoryService(库存服务),如果请求的项目和数量有库存,则将订单状态更改为 PROCESSED

5.1、创建服务

首先,创建 OrderService,它负责更新订单状态:

@Service
public class OrderService {

    Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();

    public void updateOrderStatus(UUID orderId, OrderStatus status) {
        ORDER_STATUS_STORAGE.put(orderId, status);
    }

    public OrderStatus getOrderStatus(UUID orderId) {
        return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
    }
}

然后,创建 InventoryService。使用 Map 来模拟数据库,通过 ProductIdProperties 来填充,ProductIDProperties 会自动注入 application.yaml 文件中的值:

@Service
public class InventoryService implements InitializingBean {

    private ProductIdProperties productIdProperties;

    private Map<UUID, Integer> inventory;

    public InventoryService(ProductIdProperties productIdProperties) {
        this.productIdProperties = productIdProperties;
    }

    @Override
    public void afterPropertiesSet() {
        this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
          productIdProperties.getWirelessHeadphones(), 15,
          productIdProperties.getLaptop(), 5);
    }
}

InitializingBean 接口提供了 afterPropertiesSet 方法,这是 Spring 在解析了 Bean 的所有依赖(在本例中是 ProductIdProperties Bean)后调用的生命周期方法。

添加一个 checkInventory 方法,用于验证库存中是否有所需数量的产品。如果产品不存在,它将抛出 ProductNotFoundException 异常;如果产品存在但数量不足,它将抛出 OutOfStockException 异常。在第二种情况下,我们还模拟随机补货,这样经过几次重试后,处理最终会成功:

public void checkInventory(UUID productId, int quantity) {
    Integer stock = inventory.get(productId);
    if (stock < quantity) {
        inventory.put(productId, stock + (int) (Math.random() * 5));
        throw new OutOfStockException(
          "Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
    };
    inventory.put(productId, stock - quantity);
}

5.2、创建监听器

创建第一个监听器,使用 @Component 注解,并通过 Spring 的构造函数依赖注入机制注入服务:

@Component
public class OrderProcessingListeners {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);

    private InventoryService inventoryService;

    private OrderService orderService;

    public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
        this.inventoryService = inventoryService;
        this.orderService = orderService;
    }
}

接下来,编写监听器方法:

@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
    logger.info("Message received: {}", orderCreatedEvent);
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
    inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());

    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
    logger.info("Message processed successfully: {}", orderCreatedEvent);
}

value 属性是通过 application.yaml 自动注入的队列名称。由于 ON_SUCCESS 是默认的确认模式,因此不需要在注解中指定它。

5.3、设置测试类

为了确保逻辑按预期运行,创建一个测试类:

@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);

    @Autowired
    private EventsQueuesProperties eventsQueuesProperties;

    @Autowired
    private ProductIdProperties productIdProperties;

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private MessageListenerContainerRegistry registry;
}

还添加一个名为 assertQueueIsEmpty 的方法。在该方法中,使用自动注入的 MessageListenerContainerRegistry 来获取容器,然后停止容器以确保它没有消费任何消息。Registry 包含由 @SqsListener 注解创建的所有容器:

private void assertQueueIsEmpty(String queueName, String containerId) {
    logger.info("Stopping container {}", containerId);
    var container = Objects
      .requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
    container.stop();
    // ...
}

容器停止后,使用 SqsTemplate 在队列中查找消息。如果确认成功,则不应返回任何消息。还把 pollTimeout 设置为大于可见性(message visibility)超时的值,这样如果消息尚未被删除,就会在指定的时间间隔内再次发送。

下面是 assertQueueIsEmpty 方法的另一部分:

// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
  .pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);

5.4、测试

在第一个测试中,向队列发送一个 OrderCreatedEvent(订单创建事件),其中包含一个数量大于库存量的产品订单。当异常通过监听器方法时,它会向框架发出消息处理失败的信号,消息应在消息可见性(message visibility)时间窗口结束后再次发送。

为了加快测试速度,我们在注解中将消息可见性秒数(messageVisibilitySeconds)设为 1,但通常情况下,这一配置是在队列本身中完成的,默认值为 30 秒。

使用 Spring Cloud AWS 提供的自动配置 SqsTemplate 创建事件并发送。然后,使用 Awaitility 等待订单状态变为 PROCESSED,最后,断言队列为空,这意味着确认成功:

@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.PROCESSED));
    assertQueueIsEmpty(queueName, "retry-order-processing-container");
}

注意,我们将 @SqsListener 注解中指定的 containerId 传递给了 assertQueueIsEmpty 方法。

现在运行测试。首先,要确保 Docker 正在运行,然后执行测试。在容器初始化日志之后,我们应该能看到应用的日志信息:

Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]

然后,由于库存不足,应该会出现一次或多次异常:

Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10 

而且,由于我们添加了补充逻辑,我们最终应该看到消息处理成功了:

Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]

最后,确保确认成功:

INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue

注意,测试完成后可能会抛出 “connection refused” 错误,这是因为 Docker 容器在框架停止轮询消息之前就已停止,可以放心地忽略这些错误。

6、手动确认

框架支持手动确认信息,这对于需要更好地控制确认过程的情况下非常有用。

6.1、创建监听器

为了说明这一点,我们创建一个异步场景,在这个场景中,InventoryService 的连接速度很慢,我们希望在它完成之前释放监听器线程:

@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
    CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
      .thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
      .thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
      .thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
    logger.info("Releasing processing thread.");
}

在此逻辑中,我们使用 Java 的 CompletableFuture 异步运行库存检查。将 Acknowledge 对象添加到监听器方法中,并将 SqsListenerAcknowledgementMode.MANUAL 添加到注解的 acknowledgementMode 属性中。该属性是字符串,可接受属性占位符和 SpEL。只有当我们将 AcknowledgementMode 设置为 MANUAL 时,Acknowledgement 对象才可用。

注意,示例中利用了 Spring Boot 自动配置(它提供了合理的默认值)和 @SqsListener 注解属性来更改确认模式。另一种方法是声明一个 SqsMessageListenerContainerFactory Bean,它允许设置更复杂的配置。

6.2、模拟慢连接

现在,在 InventoryService 类中添加 slowCheckInventory 方法,使用 Thread.sleep 模拟慢连接:

public void slowCheckInventory(UUID productId, int quantity) {
    simulateBusyConnection();
    checkInventory(productId, quantity);
}

private void simulateBusyConnection() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
    }
}

6.3、测试

接下来,编写测试:

@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.PROCESSED));
    assertQueueIsEmpty(queueName, "async-order-processing-container");
}

这一次,我们请求的是库存中的可用数量,所以应该不会出现错误。

运行测试,我们会看到一条日志信息,显示已收到消息:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]

然后,我们会看到线程的释放信息:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.

这是因为我们在异步处理和确认消息。大约两秒钟后,就会看到日志显示消息已确认:

INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged

最后,可以看到停止容器和断言队列为空的日志:

INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue

7、不论成功和失败都确认消息

最后一种确认模式是 ALWAYS,无论监听器方法是否抛出错误,框架都会确认消息。

7.1、创建监听器

模拟一个销售事件,在这个事件中,我们的库存是有限的,而且我们不想重新处理任何信息,无论是否有任何失败。

使用之前在 application.yml 中定义的属性将确认模式设置为 ALWAYS

@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
    logger.info("Message received: {}", orderCreatedEvent);
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
    inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());

    logger.info("Message processed: {}", orderCreatedEvent);
}

在测试中,创建一个数量大于库存的订单:

7.2、测试

@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.RECEIVED));
    assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}

现在,即使抛出 OutOfStockException,消息也会被确认,并且不会对消息进行重试:

Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue

8、总结

本文通过一个事件驱动场景来介绍了 Spring Cloud AWS v3 SQS 提供的三种消息确认模式: ON_SUCCESS(默认)、MANUALALWAYS


Ref:https://www.baeldung.com/java-spring-cloud-aws-v3-message-acknowledgement