把 Future 转换为 Completablefuture

1、简介

本文将带你了解如何将 Future 转换为 CompletableFuture。通过这种转换,我们可以利用 CompletableFuture 的高级功能,如非阻塞操作、链式任务和更好的错误处理,同时仍可使用返回 Future 的 API 或库。

2、为什么要把 Future 转换为 CompletableFuture?

Java 中的 Future 接口表示异步计算的结果。它提供了检查计算是否完成、等待计算完成和检索结果的方法。

不过,Future 也有其局限性,比如阻塞调用需要使用 get() 来获取结果。此外,它还不支持链式调用多个异步任务或处理回调。

而,Java 8 中引入的 CompletableFuture 解决了这些缺陷。它通过用于任务链和回调的 thenApply()thenAccept() 等方法支持非阻塞操作,并使用 exceptionally() 进行错误处理。

通过将 Future 转换为 CompletableFuture,我们可以在使用返回 Future 的 API 或库时利用这些功能。

3、逐步转换

来看看如何将 Future 转换为 CompletableFuture

3.1、使用 ExecutorService 模拟 Future

要了解 Future 如何工作,我们首先要使用 ExecutorService 模拟异步计算。ExecutorService 是一个用于管理和调度独立线程中任务的框架。这将有助于我们理解 Future 的阻塞特性:

@Test
void givenFuture_whenGet_thenBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    String result = future.get();
    executor.shutdown();

    assertEquals("Hello from Future!", result);
}

如上,我们使用 executor.submit() 来模拟一个返回 Future 对象的长期运行任务。future.get() 调用会阻塞主线程,直到计算完成,然后打印结果。

这种阻塞行为凸显了 Future 的一个局限性,我们希望通过 CompletableFuture 来解决这个问题。

3.2、封装 Future 为 CompletableFuture

要将 Future 转换为 CompletableFuture,我们需要在 Future 的阻塞特性与 CompletableFuture 的非阻塞、回调驱动设计之间架起一座桥梁。

为此,我们创建了一个名为 toCompletableFuture() 的方法,该方法将一个 Future 和一个 ExecutorService 作为输入,并返回一个 CompletableFuture

static <T> CompletableFuture<T> toCompletableFuture(Future<T> future, ExecutorService executor) {
    CompletableFuture<T> completableFuture = new CompletableFuture<>();
    executor.submit(() -> {
        try {
            completableFuture.complete(future.get());
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
    });
    return completableFuture;
}

如上,toCompletableFuture() 方法首先创建一个新的 CompletableFuture。然后,缓存线程池中的一个独立线程会监控该 Future

Future 完成时,会使用阻塞 get() 方法获取其结果,然后使用 complete() 方法将其传递给 CompletableFuture。如果 Future 抛出异常,CompletableFuture 会异常完成(completed exceptionally),以确保错误得到传播。

这种经过封装的 CompletableFuture 允许我们异步处理结果,并使用 thenAccept() 等回调方法。

toCompletableFuture() 方法的用法如下:

@Test
void givenFuture_whenWrappedInCompletableFuture_thenNonBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);

    completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));

    executor.shutdown();
}

future.get() 不同,这种方法避免了阻塞主线程,使代码更加灵活。我们还可以将多个阶段串联起来,对结果进行更复杂的处理。

例如,我们可以转换 Future 的结果,然后执行其他操作:

@Test
void givenFuture_whenTransformedAndChained_thenCorrectResult() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = toCompletableFuture(future, executor);

    completableFuture
      .thenApply(result -> result.toUpperCase()) // 转换结果
      .thenAccept(transformedResult -> assertEquals("HELLO FROM FUTURE!", transformedResult));

    executor.shutdown();
}

在上例中,将结果转换为大写后,打印了转换后的结果。这展示了使用 CompletableFuture 进行链式操作的强大功能。

3.3、使用 CompletableFuture 的 supplyAsync() 方法

另一种方法是利用 CompletableFuture 的方法 supplyAsync(),它可以异步执行任务,并将结果作为 CompletableFuture 返回。

来看看如何在 supplyAsync() 方法中封装阻塞的 Future 调用,以实现转换:

@Test
void givenFuture_whenWrappedUsingSupplyAsync_thenNonBlockingCall() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Hello from Future!";
    });

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

    completableFuture.thenAccept(result -> assertEquals("Hello from Future!", result));

    executor.shutdown();
}

如上,我们使用 CompletableFuture.supplyAsync() 异步执行任务。该任务将阻塞调用 future.get() 封装在一个 lambda 表达式中。这样,Future 的结果就能以非阻塞方式获取,从而使我们能够使用 CompletableFuture 方法进行回调和链式处理。

这种方法更简单,因为它避免了手动管理单独的线程。CompletableFuture 会为我们处理异步执行。

4、将多个 Future 合并为一个 CompletableFuture

在某些情况下,我们可能需要将多个 Future 对象合并为一个 CompletableFuture。在汇总不同任务的结果或等待所有任务完成后再继续处理时,这种情况很常见。使用 CompletableFuture,我们可以有效地组合多个 Future 对象,并以非阻塞方式处理它们。

要组合多个 Future 对象,首先要将它们转换为 CompletableFuture 实例。然后,使用 CompletableFuture.allOf() 等待所有任务完成。

示例如下:

static CompletableFuture<Void> allOfFutures(List<Future<String>> futures, ExecutorService executor) {
    // 将所有 Future 对象转换为 CompletableFuture 实例
    List<CompletableFuture<String>> completableFutures = futures.stream()
      .map(future -> FutureToCompletableFuture.toCompletableFuture(future, executor))
      .toList();

    return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
}

一旦所有任务完成,CompletableFuture.allOf() 方法会发出完成信号。为了演示这一点,让我们考虑一个场景,其中多个任务返回 Future 对象,其结果为字符串。我们将汇总结果并确保所有任务都成功完成:

@Test
void givenMultipleFutures_whenCombinedWithAllOf_thenAllResultsAggregated() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(3);

    List<Future<String>> futures = List.of(
        executor.submit(() -> {
            return "Task 1 Result";
        }),
        executor.submit(() -> {
            return "Task 2 Result";
        }),
        executor.submit(() -> {
            return "Task 3 Result";
        })
    );

    CompletableFuture<Void> allOf = allOfFutures(futures, executor);

    allOf.thenRun(() -> {
        try {
            List<String> results = futures.stream()
              .map(future -> {
                  try {
                      return future.get();
                  } catch (Exception e) {
                      throw new RuntimeException(e);
                  }
              })
              .toList();
            assertEquals(3, results.size());
            assertTrue(results.contains("Task 1 Result"));
            assertTrue(results.contains("Task 2 Result"));
            assertTrue(results.contains("Task 3 Result"));
        } catch (Exception e) {
            fail("Unexpected exception: " + e.getMessage());
        }
    }).join();

    executor.shutdown();
}

在这个例子中,我们通过 ExecutorService 模拟三个任务,每个任务返回一个结果。接下来,每个任务被提交并返回一个 Future 对象。我们将 Future 对象的列表传递给 allOfFutures() 方法,该方法将它们转换为 CompletableFuture,然后使用 CompletableFuture.allOf() 方法将它们组合起来。

当所有任务完成后,我们使用 thenRun() 方法汇总结果并断言其正确性。这种方法在像并行处理独立任务并需要聚合结果的情景中非常有用。

5、总结

本文介绍了如何在 Java 中将 Future 转换为 CompletableFuture。通过利用 CompletableFuture,我们可以充分利用非阻塞操作、链式任务调用和健壮的异常处理。当我们希望增强异步编程模型的能力时,这种转换尤其有用。


Ref:https://www.baeldung.com/java-transform-future-completablefuture