ExecutorService 与 CompletableFuture 指南

1、简介

本文将带你了解 Java 中用于处理并发任务的两个重要的类:ExecutorServiceCompletableFuture

主要介绍它们的功能、如何有效地使用它们、以及它们之间的区别。

2、ExecutorService 概览

ExecutorService 是 Java java.util.concurrent 包中的一个功能强大的接口,可简化对需要并发运行的任务的管理。它抽象掉了线程创建、管理和调度的复杂性,让我们可以专注于需要完成的实际工作。

ExecutorService 提供了 submit()execute() 等方法,用于提交我们希望并发运行的任务。然后,这些任务会进入队列并分配给线程池中的可用线程。如果任务返回结果,我们可以使用 Future 对象来检索结果。不过,使用 Future 上的 get() 等方法检索结果会阻塞调用线程,直到任务完成。

3、CompletableFuture 概览

CompletableFuture 是在 Java 8 中引入的。它专注于以更声明式的方式组合异步操作并处理它们的最终结果。CompletableFuture 充当一个容器,保存异步操作的最终结果。它可能不会立即返回结果,但提供了方法来定义在结果可用时要执行的操作。

ExecutorService 在检索结果时会阻塞线程,而 CompletableFuture 则以非阻塞方式运行。

4、关注点和职责

虽然 ExecutorServiceCompletableFuture 都能解决 Java 中的异步编程问题,但它们的各自的关注点和职责却截然不同。

4.1、ExecutorService

ExecutorService 专注于管理线程池和并发执行任务。它提供了创建具有不同配置(如固定大小、缓存和定时调度)的线程池的方法。

来看一个使用 ExecutorService 创建并维护三个线程的示例,如下:

ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> future = executor.submit(() -> {
    // 任务执行逻辑
    return 42;
});

调用 newFixedThreadPool(3) 方法会创建一个包含三个线程的线程池,可以确保同时执行的任务不会超过三个。然后使用 submit() 方法提交任务供线程池执行,并返回一个代表计算结果的 Future 对象。

4.2、CompletableFuture

相比之下,CompletableFuture 为异步操作的组合提供了更高层次的抽象。它侧重于定义工作流和处理异步任务的最终结果。

下面是一个使用 supplyAsync() 启动异步任务并返回 Integer 的示例:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 42;
});

如上,supplyAsync() 启动了一个异步任务,返回结果 42

5、串联异步任务

ExecutorServiceCompletableFuture 都提供了串联异步任务的机制,但它们采用的方法不同。

5.1、ExecutorService

ExecutorService 中,我们通常提交任务进行执行,然后使用这些任务返回的 Future 对象来处理依赖并串联后续任务。然而,这涉及到阻塞和等待每个任务完成后再进行下一个任务,这会导致处理异步工作流的效率低下。

假设我们向 ExecutorService 提交了两个任务,然后使用 Future 对象将它们串联起来:

ExecutorService executor = Executors.newFixedThreadPool(2);

// 提交第一个任务
Future<Integer> firstTask = executor.submit(() -> {
    return 42;
});

// 提交第二个任务
Future<String> secondTask = executor.submit(() -> {
    try {
        // 等待第一个任务执行完毕
        Integer result = firstTask.get();
        return "Result based on Task 1: " + result;
    } catch (InterruptedException | ExecutionException e) {
        // 异常处理
    }
    return null;
});

executor.shutdown();

如上例,第二个任务依赖于第一个任务的结果。但是,ExecutorService 并不提供内置链式功能,因此我们需要显式地管理这种依赖关系,即在提交第二个任务之前,使用 get() 等待第一个任务完成(阻塞线程)。

5.2、CompletableFuture

CompletableFuture 则为异步任务链提供了一种更精简、更具表现力的方式。它通过 thenApply() 等内置方法简化了任务链。通过这些方法,我们可以定义一系列异步任务,其中一个任务的输出即为下一个任务的输入。

使用 CompletableFuture 的等效示例如下:

CompletableFuture<Integer> firstTask = CompletableFuture.supplyAsync(() -> {
    // 返回结果
    return 42;
});

CompletableFuture<String> secondTask = firstTask.thenApply(result -> {
    // result 即为上一个任务执行完毕返回的结果
    return "Result based on Task 1: " + result;
});

如上,thenApply() 方法用于定义第二个任务,它取决于第一个任务的结果。当我们使用 thenApply() 将任务串联到 CompletableFuture 时,主线程不会等待第一个任务完成后再继续。它会继续执行代码的其他部分。

6、异常处理

接下来看看 ExecutorServiceCompletableFuture 如何处理错误和异常情况。

6.1、ExecutorService

使用 ExecutorService 时,错误有两种表现形式:

  • 已提交任务中抛出的异常:当我们尝试使用返回的 Future 对象上的 get() 等方法检索结果时,这些异常会传播回主线程。如果处理不当,可能会导致意想不到的行为。
  • 线程池管理过程中未捕获的异常:如果在创建或关闭线程池期间出现未捕获异常,通常会从 ExecutorService 方法本身抛出。我们需要在代码中捕获并处理这些异常。

来看一个例子,突出说明潜在的问题:

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<String> future = executor.submit(() -> {
    if (someCondition) {
        // 如果满足特定条件,就抛出异常
        throw new RuntimeException("Something went wrong!");
    }
    return "Success";
});

try {
    String result = future.get();
    System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
    // 异常处理
} finally {
    executor.shutdown();
}

在上例中,如果满足特定条件,提交的任务就会抛出异常。但是,我们需要在 future.get() 周围使用 try-catch 块来捕获任务抛出的异常或在使用 get() 获取结果时抛出异常。这种方法在管理多个任务的错误时可能会变得繁琐。

6.2、CompletableFuture

相比之下,CompletableFuture 提供了更强大的错误处理方法,如 exceptionally() 和在链式方法本身中处理异常。通过这些方法,我们可以定义如何在异步工作流的不同阶段处理错误,而无需明确的 try-catch 块。

下面是一个使用 exceptionally() 处理异常的 CompletableFuture 的等效示例:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (someCondition) {
        throw new RuntimeException("Something went wrong!");
    }
    return "Success";
})
.exceptionally(ex -> {
    System.err.println("Error in task: " + ex.getMessage());
    return "Error occurred"; // 可以选择返回一个默认值
});

future.thenAccept(result -> System.out.println("Result: " + result));

在上例中,异步任务会抛出异常,异常 exceptionally() 回调会捕获并处理该错误。如果出现异常,它会提供一个默认值("Error occurred")。

7、超时管理

超时管理在异步编程中至关重要,它可以确保任务在指定时间内完成。接下来看看 ExecutorServiceCompletableFuture 如何以不同的方式处理超时。

7.1、ExecutorService

ExecutorService 并不提供内置的超时功能。要实现超时功能,我们需要处理 Future 对象,并有可能中断超过截止时间的任务。这种方法需要手动协调:

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<String> future = executor.submit(() -> {
    try {
        // 暂停 5s 模拟耗时
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        System.err.println("Error occured: " + e.getMessage());
    }
    return "Task completed";
});

try {
    // 阻塞,获取结果,设置超时时间为 2 秒
    String result = future.get(2, TimeUnit.SECONDS);
    System.out.println("Result: " + result);
} catch (TimeoutException e) {
    // 发生了超时
    System.err.println("Task execution timed out!");
    future.cancel(true); // 手动中断任务
} catch (Exception e) {
    // 异常处理
} finally {
    executor.shutdown();
}

如上,我们向 ExecutorService 提交了一项任务,并在使用 get() 方法获取结果时指定了两秒的超时时间。如果任务完成的时间超过了指定的超时时间,就会抛出一个 TimeoutException 异常。这种方法容易出错,需要小心处理。

主要注意的是,虽然超时机制中断了任务结果的等待,但任务本身仍将在后台继续运行,直到完成或被中断。要中断在 ExecutorService 中运行的任务,需要使用 Future.cancel(true) 方法

7.2、CompletableFuture

在 Java 9 中,CompletableFuture 通过 completeOnTimeout() 等方法提供了一种更精简的超时方法。如果原始任务在指定的超时持续时间内未完成,completableFuturecompleteOnTimeout() 方法将使用指定值完成该任务。

来看一个使用 completeOnTimeout() 的例子:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        // 暂停 5秒 模拟耗时操作
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        // 异常处理
    }
    return "Task completed";
});

// 2秒超时,返回 "Timed out!"
CompletableFuture<String> timeoutFuture = future.completeOnTimeout("Timed out!", 2, TimeUnit.SECONDS);

String result = timeoutFuture.join();
System.out.println("Result: " + result);

在上例中,supplyAsync() 方法启动了一个异步任务,模拟长期运行的操作,需要 5 秒钟才能完成。然后,使用 completeOnTimeout() 指定了两秒的超时时间。如果任务没有在两秒内完成,CompletableFuture 将自动完成,并返回 "Timed out!" 值。

8、对比

下面的对比表总结了 ExecutorServiceCompletableFuture 之间的主要区别:

特性 ExecutorService CompletableFuture
关注点 线程池管理和任务执行 组合异步操作和处理最终结果
任务串联 使用 Future 对象手动控制 内置方法,如 thenApply()
异常处理 使用 try-catch 手动包裹 Future.get() 进行处理 使用内置的 exceptionally()whenComplete() 链式方法进行处理
超时控制 手动调用 Future.get(timeout),可能会中断任务 内置方法,如 completeOnTimeout()
阻塞与非阻塞 阻塞(通常是通过 Future.get() 方法获取结果) 非阻塞(链式任务不会阻塞主线程)

9、总结

本文介绍了 Java 中处理异步任务的两个基本类:ExecutorServiceCompletableFutureExecutorService 简化了线程池和并发任务执行的管理,而 CompletableFuture 则为异步操作的组合和结果处理提供了更高层次的抽象。

本文还介绍了它们的功能、差异以及异常处理机制、超时管理和异步任务的串联方式。


Ref:https://www.baeldung.com/java-executorservice-vs-completablefuture