把 Future 转换为 Completablefuture
1、简介
本文将带你了解如何将 Future
转换为 CompletableFuture
。通过这种转换,我们可以利用 CompletableFuture
的高级功能,如非阻塞操作、链式任务和更好的错误处理,同时仍可使用返回 Future
的 API 或库。
2、为什么要把 Future 转换为 CompletableFuture?
Java 中的 Future
接口表示异步计算的结果。它提供了检查计算是否完成、等待计算完成和检索结果的方法。
不过,Future
也有其局限性,比如阻塞调用需要使用 get()
来获取结果。此外,它还不支持链式调用多个异步任务或处理回调。
而,Java 8 中引入的 CompletableFuture
解决了这些缺陷。它通过用于任务链和回调的 thenApply()
和 thenAccept()
等方法支持非阻塞操作,并使用 exceptionally()
进行错误处理。
通过将 Future
转换为 CompletableFuture
,我们可以在使用返回 Future
的 API 或库时利用这些功能。
3、逐步转换
来看看如何将 Future
转换为 CompletableFuture
。
3.1、使用 ExecutorService 模拟 Future
要了解 Future
如何工作,我们首先要使用 ExecutorService
模拟异步计算。ExecutorService
是一个用于管理和调度独立线程中任务的框架。这将有助于我们理解 Future
的阻塞特性:
@Test
void givenFuture_whenGet_thenBlockingCall() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello from Future!";
});
String result = future.get();
executor.shutdown();
assertEquals("Hello from Future!", result);
}
如上,我们使用 executor.submit()
来模拟一个返回 Future
对象的长期运行任务。future.get()
调用会阻塞主线程,直到计算完成,然后打印结果。
这种阻塞行为凸显了 Future
的一个局限性,我们希望通过 CompletableFuture
来解决这个问题。
3.2、封装 Future 为 CompletableFuture
要将 Future
转换为 CompletableFuture
,我们需要在 Future
的阻塞特性与 CompletableFuture
的非阻塞、回调驱动设计之间架起一座桥梁。
为此,我们创建了一个名为 toCompletableFuture()
的方法,该方法将一个 Future
和一个 ExecutorService
作为输入,并返回一个 CompletableFuture
:
static <T> CompletableFuture<T> toCompletableFuture(Future<T> future, ExecutorService executor) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
executor.submit(() -> {
try {
completableFuture.complete(future.get());
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
});
return completableFuture;
}
如上,toCompletableFuture()
方法首先创建一个新的 CompletableFuture
。然后,缓存线程池中的一个独立线程会监控该 Future
。
当 Future
完成时,会使用阻塞 get()
方法获取其结果,然后使用 complete()
方法将其传递给 CompletableFuture
。如果 Future
抛出异常,CompletableFuture
会异常完成(completed exceptionally),以确保错误得到传播。
这种经过封装的 CompletableFuture
允许我们异步处理结果,并使用 thenAccept()
等回调方法。
toCompletableFuture()
方法的用法如下:
@Test
void givenFuture_whenWrappedInCompletableFuture_thenNonBlockingCall() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello from Future!";
});
CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);
completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));
executor.shutdown();
}
与 future.get()
不同,这种方法避免了阻塞主线程,使代码更加灵活。我们还可以将多个阶段串联起来,对结果进行更复杂的处理。
例如,我们可以转换 Future
的结果,然后执行其他操作:
@Test
void givenFuture_whenTransformedAndChained_thenCorrectResult() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello from Future!";
});
CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);
completableFuture
.thenApply(result -> result.toUpperCase()) // 转换结果
.thenAccept(transformedResult -> assertEquals("HELLO FROM FUTURE!", transformedResult));
executor.shutdown();
}
在上例中,将结果转换为大写后,打印了转换后的结果。这展示了使用 CompletableFuture
进行链式操作的强大功能。
3.3、使用 CompletableFuture 的 supplyAsync() 方法
另一种方法是利用 CompletableFuture
的方法 supplyAsync()
,它可以异步执行任务,并将结果作为 CompletableFuture
返回。
来看看如何在 supplyAsync()
方法中封装阻塞的 Future
调用,以实现转换:
@Test
void givenFuture_whenWrappedUsingSupplyAsync_thenNonBlockingCall() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello from Future!";
});
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));
executor.shutdown();
}
如上,我们使用 CompletableFuture.supplyAsync()
异步执行任务。该任务将阻塞调用 future.get()
封装在一个 lambda 表达式中。这样,Future
的结果就能以非阻塞方式获取,从而使我们能够使用 CompletableFuture
方法进行回调和链式处理。
这种方法更简单,因为它避免了手动管理单独的线程。CompletableFuture
会为我们处理异步执行。
4、将多个 Future 合并为一个 CompletableFuture
在某些情况下,我们可能需要将多个 Future
对象合并为一个 CompletableFuture
。在汇总不同任务的结果或等待所有任务完成后再继续处理时,这种情况很常见。使用 CompletableFuture
,我们可以有效地组合多个 Future
对象,并以非阻塞方式处理它们。
要组合多个 Future
对象,首先要将它们转换为 CompletableFuture
实例。然后,使用 CompletableFuture.allOf()
等待所有任务完成。
示例如下:
static CompletableFuture<Void> allOfFutures(List<Future<String>> futures, ExecutorService executor) {
// 将所有 Future 对象转换为 CompletableFuture 实例
List<CompletableFuture<String>> completableFutures = futures.stream()
.map(future -> FutureToCompletableFuture.toCompletableFuture(future, executor))
.toList();
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
}
一旦所有任务完成,CompletableFuture.allOf()
方法会发出完成信号。为了演示这一点,让我们考虑一个场景,其中多个任务返回 Future
对象,其结果为字符串。我们将汇总结果并确保所有任务都成功完成:
@Test
void givenMultipleFutures_whenCombinedWithAllOf_thenAllResultsAggregated() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<String>> futures = List.of(
executor.submit(() -> {
return "Task 1 Result";
}),
executor.submit(() -> {
return "Task 2 Result";
}),
executor.submit(() -> {
return "Task 3 Result";
})
);
CompletableFuture<Void> allOf = allOfFutures(futures, executor);
allOf.thenRun(() -> {
try {
List<String> results = futures.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.toList();
assertEquals(3, results.size());
assertTrue(results.contains("Task 1 Result"));
assertTrue(results.contains("Task 2 Result"));
assertTrue(results.contains("Task 3 Result"));
} catch (Exception e) {
fail("Unexpected exception: " + e.getMessage());
}
}).join();
executor.shutdown();
}
在这个例子中,我们通过 ExecutorService
模拟三个任务,每个任务返回一个结果。接下来,每个任务被提交并返回一个 Future
对象。我们将 Future
对象的列表传递给 allOfFutures()
方法,该方法将它们转换为 CompletableFuture
,然后使用 CompletableFuture.allOf()
方法将它们组合起来。
当所有任务完成后,我们使用 thenRun()
方法汇总结果并断言其正确性。这种方法在像并行处理独立任务并需要聚合结果的情景中非常有用。
5、总结
本文介绍了如何在 Java 中将 Future
转换为 CompletableFuture
。通过利用 CompletableFuture
,我们可以充分利用非阻塞操作、链式任务调用和健壮的异常处理。当我们希望增强异步编程模型的能力时,这种转换尤其有用。
Ref:https://www.baeldung.com/java-transform-future-completablefuture