WebClient 将 Flux 读取到 InputStream 中

1、概览

本文将会带你了解在 Java 响应式编程中如何将 Flux<DataBuffer> 读取到 InputStream

2、请求设置

首先,使用 Spring Reactive WebClient 发起 GET 请求。使用由 gorest.co.in 托管的公共 API 端点来进行测试:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

接下来,定义 getWebClient() 方法,用于获取 WebClient 类的新实例:

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

至此,我们就可以向 /public/v2/users 端点发出 GET 请求了。注意,必须以 Flux<DataBuffer> 对象的形式获取响应体。

3、BodyExtractorsDataBufferUtils

我们可以使用 spring-webfluxBodyExtractors 类的 toDataBuffers() 方法将响应体提取到 Flux<DataBuffer> 中。

body 构建为 Flux<DataBuffer> 类型的实例:

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

接下来,需要将这些 DataBuffer Stream 收集到一个 InputStream 中,实现这一目标的好方法是使用 PipedInputStreamPipedOutputStream

我们打算向管道输出流(PipedOutputStream)写入内容,并最终从管道输入流(PipedInputStream)读取内容。

创建这两个相连的流:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

注意,默认大小为 1024 字节。但是,预计从 Flux<DataBuffer> 收集到的结果可能会超过默认值。因此,需要明确指定一个更大的值,在本例中就是 1024 * 10*。

最后,使用 DataBufferUtils 类中的 write() 方法,将 body 作为 publisher 写入 outputStream

DataBufferUtils.write(body, outputStream).subscribe();

注意,在声明时,已将 inputStream 连接到 outputStream。因此,现在已经可以从 inputStream 读取数据了。

4、从管道输入流(PipedInputStream)读取数据

首先,定义一个方法 readContent(),以 String 的形式读取 InputStream

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

接下来,典型的做法 是在不同的线程中读取 PipedInputStream,所以创建 readContentFromPipedInputStream() 方法,在内部创建一个新的线程,通过调用 readContent() 方法将 PipedInputStream 中的内容读取为 String

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

现在,看看它的运行效果。

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

由于系统是异步的,所以在从从数据流中读取数据之前通过 Thread.sleep 把当前线程暂停 3 秒,目的是为了能够看到完整的响应。另外,在输出日志的时候,添加了一个换行符,把长的日志输出分成了多行,

最后,执行代码,验证输出的结果:

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]

一切OK。

5、总结

本文介绍了如何通过管道流以及 BodyExtractorsDataBufferUtils 类中的工具方法,将 Flux<DataBuffer> 读取到 InputStream


参考:https://www.baeldung.com/spring-reactive-read-flux-into-inputstream