在 Spring 中实现 Bulk 和 Batch API

1、概览

尽管标准的 REST API 可以满足大多数常见需求,但在处理批量(Bulk)或批处理(Batch)操作时,基于 REST 的架构风格存在一些限制。

本文将带你了解如何在微服务中应用 Bulk 和 Batch 操作,以及如何实现一些自定义的面向 “写” 的 Bulk 和 Batch API。

2、 Bulk 和 Batch API 简介

Bulk 和 Batch 操作这两个术语经常被互换使用。不过,两者之间还是有硬性区别的。

通常,Bulk(批量)操作是指对同一类型的多个条目执行相同的操作。一种简单的方法是为每个请求调用相同的 API 来执行批量操作。这种方法可能太慢,而且会浪费资源。相反,我们可以在一次往返中处理多个条目。

我们可以通过在一次调用中对同一类型的多个条目执行相同的操作来实现 Bulk(批量)操作。这种对条目集合进行操作的方式可减少整体延迟并提高应用性能。要实现 Bulk(批量)操作,我们可以重用用于于单个条目的现有端点,或者为 Bulk(批量)方法创建一个单独的路由。

Batch(批处理)操作通常是指对多个资源类型执行不同的操作。Batch API 是在一次调用中对多个资源执行多个操作的集合。这些资源操作可能没有任何连贯性。每个请求路由可以是独立的,互相之间没有依赖关系。

简而言之,“Batch” 一词是指批量处理不同的请求。

目前,对于实现 Bulk 或 Batch 操作来说,缺乏明确定义的标准或规范。此外,许多流行的框架,如 Spring,没有内置的 Bulk(批量)操作支持。

尽管如此,在本文中,我们将使用常见的 REST 架构来实现自定义 Bulk 和 Batch 操作的方法。

3、Spring 示例应用

假设我们需要构建一个简单的微服务,同时支持 Bulk 和 Batch 操作。

3.1、Maven 依赖

首先,添加 spring-boot-starter-webspring-boot-starter-validation 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>3.1.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-validation</artifactId>
    <version>3.1.5</version>
</dependency>

spring-boot-starter-validation 依赖用于验证 Bulk 和 Batch 请求的大小。

3.2、实现第一个 Spring 服务

实现一个在 Repository 中创建、更新和删除数据的服务。

首先,创建 Customer Model:

public class Customer implements Serializable {
    private String id;
    private String name;
    private String email;
    private String address;
    // getter/setter 方法省略
}

接下来,创建 CustomerService 类,并在其中实现 createCustomers() 方法。该方法在内存 Repository 中存储多个 Customer 对象:

@Service
public class CustomerService {
    private final Map<String, Customer> customerRepoMap = new HashMap<>();

    public List<Customer> createCustomers(List<Customers> customers) {
        return customers.stream()
          .map(this::createCustomer)
          .filter(Optional::isPresent)
          .map(Optional::get)
          .collect(toList());
    }
}

createCustomer(Customer customer) 方法实现如下,用来创建一个 Customer 对象:

public Optional<Customer> createCustomer(Customer customer) {
    if (!customerRepoMap.containsKey(customer.getEmail()) && customer.getId() == 0) {
        Customer customerToCreate = new Customer(customerRepoMap.size() + 1, 
          customer.getName(), customer.getEmail());
        customerToCreate.setAddress(customer.getAddress());
        customerRepoMap.put(customerToCreate.getEmail(), customerToCreate);  
        return Optional.of(customerToCreate);
    }

    return Optional.empty();
}

在上述方法中,只在 customer 不存在于 Repository 中的情况下创建 Customer,否则将返回一个空的 Optional 对象。

同样,再实现一个方法来更新现有 Customer 的详细信息:

private Optional<Customer> updateCustomer(Customer customer) {
    Customer customerToUpdate = customerRepoMap.get(customer.getEmail());
    if (customerToUpdate != null && customerToUpdate.getId() == customer.getId()) {
        customerToUpdate.setName(customer.getName());
        customerToUpdate.setAddress(customer.getAddress());
    }
    return Optional.ofNullable(customerToUpdate);
}

最后,实现一个 deleteCustomer() 方法,从 Repository 中删除现有的 Customer

public Optional<Customer> deleteCustomer(Customer customer) {
    Customer customerToDelete = customerRepoMap.get(customer.getEmail());
    if (customerToDelete != null && customerToDelete.getId() == customer.getId()) {
        customerRepoMap.remove(customer.getEmail());
    }

   return Optional.ofNullable(customerToDelete);
}

3.3、实现第二个 Spring 服务

再实现一个服务,在 Repository 中获取并创建 Address 数据。

首先,定义 Address 类:

public class Address implements Serializable {
    private int id;
    private String street;
    private String city;
    // Getter/Setter 方法省略
}

然后,创建 AddressService 类,实现 createAddress(Address address) 方法:

public Address createAddress(Address address) {
    Address createdAddress = null;
    String addressUniqueKey = address.getStreet().concat(address.getCity());
    if (!addressRepoMap.containsKey(addressUniqueKey)) {
        createdAddress = new Address(addressRepoMap.size() + 1, 
          address.getStreet(), address.getCity());
        addressRepoMap.put(addressUniqueKey, createdAddress);
    }
    return createdAddress;
}

4、使用现有端点实现 Bulk API

现在,创建一个 API 来支持 Bulk(批量)和单项创建操作。

4.1、实现 Bulk Controller

实现一个 BulkController 类,其中包含一个端点,可以在单个调用中创建单个或多个 Customer

Bulk(批量)请求的 JSON 格式如下:

[
    {
        "name": "<name>",
        "email": "<email>",
        "address": "<address>"
    }
]

此时,我们可以使用自定义 HTTP Header(X-ActionType)来区分是 Bulk(批量)操作还是单项操作。

接着,在 BulkController 类中实现 bulkCreateCustomers() 方法,并使用上文中 CustomerService 的方法:

@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<Customer>> bulkCreateCustomers(
  @RequestHeader(value="X-ActionType") String actionType, 
  @RequestBody @Valid @Size(min = 1, max = 20) List<Customer> customers) {
    List<Customer> customerList = actionType.equals("bulk") ? 
      customerService.createCustomers(customers) :
      singletonList(customerService.createCustomer(customers.get(0)).orElse(null));

    return new ResponseEntity<>(customerList, HttpStatus.CREATED);
}

如上,使用 X-ActionType Header 接受 Bulk(批量)请求。此外,还使用 @Size 注解添加了请求大小验证。代码会决定是将整个 List 传递给 createCustomers() 还是只将元素 0 传递给 createCustomer()

不同的 create 方法会返回 List 或单个 Optional,为了保持 HTTP 响应的一致,因此要将后者转换为 List

4.2、验证 Bulk API

运行应用,并通过 cURL 来验证 Bulk(批量)API 端点:

$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'X-ActionType: bulk' \
--header 'Content-Type: application/json' \
--data-raw '[
    {
        "name": "test1",
        "email": "test1@email.com",
        "address": "address1"
    },
    {
        "name": "test2",
        "email": "test2@email.com",
        "address": "address2"
    }
]'

响应如下,成功地创建了数据:

HTTP/1.1 201 
[{"id":1,"name":"test1","email":"test1@email.com","address":"address1"},
{"id":1,"name":"test2","email":"test2@email.com","address":"address2"},
...

接下来,我们采用另一种方法实现 Bulk(批量)操作。

5、使用不同的端点实现 Bulk API

在 Bulk API 中,对同一资源执行不同操作的情况并不常见。不过,让我们看看最灵活的方法是如何实现的。

我们可以实现一个原子 Bulk(批量)操作,其中整个请求在单个事务中要么都成功,要么都失败。或者,可以允许成功的更新独立于失败的更新,并通过响应来说明是完全成功还是部分成功。我们将实现第二种方法。

5.1、定义请求和响应 Model

考虑一个用例,即在一次调用中创建、更新和删除多个 Customer

Bulk 请求的 JSON 格式如下:

[
    {
        "bulkActionType": "<CREATE OR UPDATE OR DELETE>",
        "customers": [
            {
                "name": "<name>",
                "email": "<email>",
                "address": "<address>"
            }
        ]
    }
]

首先,定义 CustomerBulkRequest 类,为上述 JSON 格式建模 :

public class CustomerBulkRequest {
    private BulkActionType bulkActionType;
    private List<Customer> customers;
    // getter/setter 方法省略
}

BulkActionType 枚举定义如下:

public enum BulkActionType {
    CREATE, UPDATE, DELETE
}

然后,定义 HTTP 响应对象, CustomerBulkResponse 类:

public class CustomerBulkResponse {
    private BulkActionType bulkActionType;
    private List<Customer> customers;
    private BulkStatus status;
    // getter/setter 方法省略
}

最后,定义 BulkStatus 枚举来表示每个操作的返回状态:

public enum BulkStatus {
    PROCESSED, PARTIALLY_PROCESSED, NOT_PROCESSED
}

5.2、实现 Bulk Controller

实现一个 Bulk API,根据 bulkActionType 枚举接收 Bulk(请求)并进行处理,然后一起返回 Bulk 状态和 Customer 数据。

首先,在 BulkController 类中创建一个 EnumMap,并将 BulkActionType 枚举映射到其对应的 CustomerServiceFunction

@RestController
@RequestMapping("/api")
@Validated
public class BulkController {
    private final CustomerService customerService;
    private final EnumMap<BulkActionType, Function<Customer, Optional<Customer>>> bulkActionFuncMap = 
      new EnumMap<>(BulkActionType.class);

    public BulkController(CustomerService customerService) {
        this.customerService = customerService;
        bulkActionFuncMap.put(BulkActionType.CREATE, customerService::createCustomer);
        bulkActionFuncMap.put(BulkActionType.UPDATE, customerService::updateCustomer);
        bulkActionFuncMap.put(BulkActionType.DELETE, customerService::deleteCustomer);
    }
}

EnumMap 定义了请求类型与 CustomerService 方法之间的绑定(策略模式)。它可以减少冗长的 switchif 语句。

我们可以把 EnumMap 返回的 Function 根据操作类型传递给 Customer Stream 上的 map() 方法。

List<Customer> customers = customerBulkRequest.getCustomers().stream()
   .map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
   ...

由于我们所有的 Function 对象都是从 Customer 映射到 Optional<Customer>,因此这基本上是使用 Stream 中的 map() 操作来执行 Bulk 请求,并将结果 Customer 保留在 Stream 中(如果可用)。

完整的 Controller 方法实现如下:

@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<CustomerBulkResponse>> bulkProcessCustomers(
  @RequestBody @Valid @Size(min = 1, max = 20) 
  List<CustomerBulkRequest> customerBulkRequests) {
    List<CustomerBulkResponse> customerBulkResponseList = new ArrayList<>();

    customerBulkRequests.forEach(customerBulkRequest -> {
        List<Customer> customers = customerBulkRequest.getCustomers().stream()
          .map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
          .filter(Optional::isPresent)
          .map(Optional::get)
          .collect(toList());
        
        BulkStatus bulkStatus = getBulkStatus(customerBulkRequest.getCustomers(), 
          customers);     
        customerBulkResponseList.add(CustomerBulkResponse.getCustomerBulkResponse(customers, 
          customerbulkRequest.getBulkActionType(), bulkStatus));
    });

    return new ResponseEntity<>(customerBulkResponseList, HttpStatus.Multi_Status);
}

此外,还要实现 getBulkStatus 方法,根据创建的 Customer 数量返回特定的 bulkStatus 枚举:

private BulkStatus getBulkStatus(List<Customer> customersInRequest, 
  List<Customer> customersProcessed) {
    if (!customersProcessed.isEmpty()) {
        return customersProcessed.size() == customersInRequest.size() ?
          BulkStatus.PROCESSED : 
          BulkStatus.PARTIALLY_PROCESSED;
    }

    return BulkStatus.NOT_PROCESSED;
}

注意,还可以考虑在每个操作之间添加输入验证,以处理可能存在的冲突。

5.3、验证 Bulk API

运行应用,并使用 cURL 调用上述端点,即 /customers/bulk

$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'Content-Type: application/json' \
--data-raw '[
    {
        "bulkActionType": "CREATE",
        "customers": [
            {
                "name": "test4",
                "email": "test4@email.com",
                ...
            }
        ]
    },
    {
        "bulkActionType": "UPDATE",
        "customers": [
            ...
        ]
    },
    {
        "bulkActionType": "DELETE",
        "customers": [
            ...
        ]
    }
]'

响应如下,一切 Ok:

HTTP/1.1 207 
[{"customers":[{"id":4,"name":"test4","email":"test4@email.com","address":"address4"}],"status":"PROCESSED","bulkType":"CREATE"},
...

接下来,实现一个 Batch API,在一次 Batch(批处理)调用中同时处理 CustomerAddress

6、实现 Batch API

通常情况下,Batch API 请求是一系列子请求的集合,每个子请求都有自己的方法、资源 URL 和有 Payload。

我们要实现一个 Batch API,用于创建和更新两种资源类型。当然,还可以包含其他操作,如删除操作。但为了简单起见,本例只考虑 POSTPATCH 方法(即,创建和更新)。

6.1、定义 Batch 请求 Model

首先,定义混合数据请求的 JSON 格式:

[
    {
        "method": "POST",
        "relativeUrl": "/address",
        "data": {
            "street": "<street>",
            "city": "<city>"
        }
    },
    {
        "method": "PATCH",
        "relativeUrl": "/customer",
        "data": {
            "id": "<id>",
            "name": "<name>",
            "email": "<email>",
            "address": "<address>"
        }
    }
]

创建 JSON 格式对应的 Model 类,BatchRequest:

public class BatchRequest {
    private HttpMethod method;
    private String relativeUrl;
    private JsonNode data;
    // 标准的 getter/setter 方法省略
}

6.2、实现 Batch Controller

实现一个 Batch API,在一次请求中创建 Address 并更新 CustomerAddress。为了简单起见,这里在同一个微服务中编写此 API。在其他架构模式中,可能会选择在不同的微服务中实现它,并行调用各个端点。

使用上述 BatchRequest 类时,可能会遇到将 JsonNode 反序列化为特定类型类的问题。我们可以使用 ObjectMapperconvertValue 方法将 JsonNode 转换为指定类型对象,从而轻松解决这个问题。

对于 Batch API,我们根据请求的 HttpMethodBatchRequest 类中的 relativeUrl 参数,来调用 AddressServiceCustomerService 的方法。

BatchController 类中实现 Batch 端点,如下:

@PostMapping(path = "/batch")
public String batchUpdateCustomerWithAddress(
  @RequestBody @Valid @Size(min = 1, max = 20) List<BatchRequest> batchRequests) {
    batchRequests.forEach(batchRequest -> {
        if (batchRequest.getMethod().equals(HttpMethod.POST) && 
          batchRequest.getRelativeUrl().equals("/address")) {
            addressService.createAddress(objectMapper.convertValue(batchRequest.getData(), 
              Address.class));
        } else if (batchRequest.getMethod().equals(HttpMethod.PATCH) && 
            batchRequest.getRelativeUrl().equals("/customer")) {
            customerService.updateCustomer(objectMapper.convertValue(batchRequest.getData(), 
              Customer.class));
        }
    });

    return "Batch update is processed";
}

6.3、验证 Batch API

启动应用,使用 cURL 请求上述 /batch 端点:

$ curl -i --request POST 'http://localhost:8080/api/batch' \
--header 'Content-Type: application/json' \
--data-raw '[
    {
        "method": "POST",
        "relativeUrl": "/address",
        "data": {
            "street": "test1",
            "city": "test"
        }
    },
    {
        "method": "PATCH",
        "relativeUrl": "/customer",
        "data": {
            "id": "1",
            "name": "test1",
            "email": "test1@email.com",
            "address": "address2"
        }
    }
]'

响应如下,符合预期:

HTTP/1.1 200
Batch update is processed

7、总结

本文介绍了 Bulk(批量)和 Batch(批处理)操作的区别,以及如何在 Spring 应用中实现 Bulk(批量)和 Batch(批处理)API 端点。


Ref:https://www.baeldung.com/spring-bulk-batch-api-implementation