ExecutorService 与 CompletableFuture 指南
1、简介
本文将带你了解 Java 中用于处理并发任务的两个重要的类:ExecutorService
和 CompletableFuture
。
主要介绍它们的功能、如何有效地使用它们、以及它们之间的区别。
2、ExecutorService 概览
ExecutorService
是 Java java.util.concurrent
包中的一个功能强大的接口,可简化对需要并发运行的任务的管理。它抽象掉了线程创建、管理和调度的复杂性,让我们可以专注于需要完成的实际工作。
ExecutorService
提供了 submit()
和 execute()
等方法,用于提交我们希望并发运行的任务。然后,这些任务会进入队列并分配给线程池中的可用线程。如果任务返回结果,我们可以使用 Future
对象来检索结果。不过,使用 Future
上的 get()
等方法检索结果会阻塞调用线程,直到任务完成。
3、CompletableFuture 概览
CompletableFuture
是在 Java 8 中引入的。它专注于以更声明式的方式组合异步操作并处理它们的最终结果。CompletableFuture
充当一个容器,保存异步操作的最终结果。它可能不会立即返回结果,但提供了方法来定义在结果可用时要执行的操作。
ExecutorService
在检索结果时会阻塞线程,而 CompletableFuture
则以非阻塞方式运行。
4、关注点和职责
虽然 ExecutorService
和 CompletableFuture
都能解决 Java 中的异步编程问题,但它们的各自的关注点和职责却截然不同。
4.1、ExecutorService
ExecutorService
专注于管理线程池和并发执行任务。它提供了创建具有不同配置(如固定大小、缓存和定时调度)的线程池的方法。
来看一个使用 ExecutorService
创建并维护三个线程的示例,如下:
ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> future = executor.submit(() -> {
// 任务执行逻辑
return 42;
});
调用 newFixedThreadPool(3)
方法会创建一个包含三个线程的线程池,可以确保同时执行的任务不会超过三个。然后使用 submit()
方法提交任务供线程池执行,并返回一个代表计算结果的 Future
对象。
4.2、CompletableFuture
相比之下,CompletableFuture
为异步操作的组合提供了更高层次的抽象。它侧重于定义工作流和处理异步任务的最终结果。
下面是一个使用 supplyAsync()
启动异步任务并返回 Integer
的示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 42;
});
如上,supplyAsync()
启动了一个异步任务,返回结果 42。
5、串联异步任务
ExecutorService
和 CompletableFuture
都提供了串联异步任务的机制,但它们采用的方法不同。
5.1、ExecutorService
在 ExecutorService
中,我们通常提交任务进行执行,然后使用这些任务返回的 Future
对象来处理依赖并串联后续任务。然而,这涉及到阻塞和等待每个任务完成后再进行下一个任务,这会导致处理异步工作流的效率低下。
假设我们向 ExecutorService
提交了两个任务,然后使用 Future
对象将它们串联起来:
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交第一个任务
Future<Integer> firstTask = executor.submit(() -> {
return 42;
});
// 提交第二个任务
Future<String> secondTask = executor.submit(() -> {
try {
// 等待第一个任务执行完毕
Integer result = firstTask.get();
return "Result based on Task 1: " + result;
} catch (InterruptedException | ExecutionException e) {
// 异常处理
}
return null;
});
executor.shutdown();
如上例,第二个任务依赖于第一个任务的结果。但是,ExecutorService
并不提供内置链式功能,因此我们需要显式地管理这种依赖关系,即在提交第二个任务之前,使用 get()
等待第一个任务完成(阻塞线程)。
5.2、CompletableFuture
而 CompletableFuture
则为异步任务链提供了一种更精简、更具表现力的方式。它通过 thenApply()
等内置方法简化了任务链。通过这些方法,我们可以定义一系列异步任务,其中一个任务的输出即为下一个任务的输入。
使用 CompletableFuture
的等效示例如下:
CompletableFuture<Integer> firstTask = CompletableFuture.supplyAsync(() -> {
// 返回结果
return 42;
});
CompletableFuture<String> secondTask = firstTask.thenApply(result -> {
// result 即为上一个任务执行完毕返回的结果
return "Result based on Task 1: " + result;
});
如上,thenApply()
方法用于定义第二个任务,它取决于第一个任务的结果。当我们使用 thenApply()
将任务串联到 CompletableFuture
时,主线程不会等待第一个任务完成后再继续。它会继续执行代码的其他部分。
6、异常处理
接下来看看 ExecutorService
和 CompletableFuture
如何处理错误和异常情况。
6.1、ExecutorService
使用 ExecutorService
时,错误有两种表现形式:
- 已提交任务中抛出的异常:当我们尝试使用返回的
Future
对象上的get()
等方法检索结果时,这些异常会传播回主线程。如果处理不当,可能会导致意想不到的行为。 - 线程池管理过程中未捕获的异常:如果在创建或关闭线程池期间出现未捕获异常,通常会从
ExecutorService
方法本身抛出。我们需要在代码中捕获并处理这些异常。
来看一个例子,突出说明潜在的问题:
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
if (someCondition) {
// 如果满足特定条件,就抛出异常
throw new RuntimeException("Something went wrong!");
}
return "Success";
});
try {
String result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
// 异常处理
} finally {
executor.shutdown();
}
在上例中,如果满足特定条件,提交的任务就会抛出异常。但是,我们需要在 future.get()
周围使用 try-catch
块来捕获任务抛出的异常或在使用 get()
获取结果时抛出异常。这种方法在管理多个任务的错误时可能会变得繁琐。
6.2、CompletableFuture
相比之下,CompletableFuture
提供了更强大的错误处理方法,如 exceptionally()
和在链式方法本身中处理异常。通过这些方法,我们可以定义如何在异步工作流的不同阶段处理错误,而无需明确的 try-catch
块。
下面是一个使用 exceptionally()
处理异常的 CompletableFuture
的等效示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (someCondition) {
throw new RuntimeException("Something went wrong!");
}
return "Success";
})
.exceptionally(ex -> {
System.err.println("Error in task: " + ex.getMessage());
return "Error occurred"; // 可以选择返回一个默认值
});
future.thenAccept(result -> System.out.println("Result: " + result));
在上例中,异步任务会抛出异常,异常 exceptionally()
回调会捕获并处理该错误。如果出现异常,它会提供一个默认值("Error occurred"
)。
7、超时管理
超时管理在异步编程中至关重要,它可以确保任务在指定时间内完成。接下来看看 ExecutorService
和 CompletableFuture
如何以不同的方式处理超时。
7.1、ExecutorService
ExecutorService
并不提供内置的超时功能。要实现超时功能,我们需要处理 Future
对象,并有可能中断超过截止时间的任务。这种方法需要手动协调:
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
try {
// 暂停 5s 模拟耗时
Thread.sleep(5000);
} catch (InterruptedException e) {
System.err.println("Error occured: " + e.getMessage());
}
return "Task completed";
});
try {
// 阻塞,获取结果,设置超时时间为 2 秒
String result = future.get(2, TimeUnit.SECONDS);
System.out.println("Result: " + result);
} catch (TimeoutException e) {
// 发生了超时
System.err.println("Task execution timed out!");
future.cancel(true); // 手动中断任务
} catch (Exception e) {
// 异常处理
} finally {
executor.shutdown();
}
如上,我们向 ExecutorService
提交了一项任务,并在使用 get()
方法获取结果时指定了两秒的超时时间。如果任务完成的时间超过了指定的超时时间,就会抛出一个 TimeoutException
异常。这种方法容易出错,需要小心处理。
主要注意的是,虽然超时机制中断了任务结果的等待,但任务本身仍将在后台继续运行,直到完成或被中断。要中断在 ExecutorService
中运行的任务,需要使用 Future.cancel(true)
方法。
7.2、CompletableFuture
在 Java 9 中,CompletableFuture
通过 completeOnTimeout()
等方法提供了一种更精简的超时方法。如果原始任务在指定的超时持续时间内未完成,completableFuture
的 completeOnTimeout()
方法将使用指定值完成该任务。
来看一个使用 completeOnTimeout()
的例子:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 暂停 5秒 模拟耗时操作
Thread.sleep(5000);
} catch (InterruptedException e) {
// 异常处理
}
return "Task completed";
});
// 2秒超时,返回 "Timed out!"
CompletableFuture<String> timeoutFuture = future.completeOnTimeout("Timed out!", 2, TimeUnit.SECONDS);
String result = timeoutFuture.join();
System.out.println("Result: " + result);
在上例中,supplyAsync()
方法启动了一个异步任务,模拟长期运行的操作,需要 5 秒钟才能完成。然后,使用 completeOnTimeout()
指定了两秒的超时时间。如果任务没有在两秒内完成,CompletableFuture
将自动完成,并返回 "Timed out!"
值。
8、对比
下面的对比表总结了 ExecutorService
和 CompletableFuture
之间的主要区别:
特性 | ExecutorService | CompletableFuture |
---|---|---|
关注点 | 线程池管理和任务执行 | 组合异步操作和处理最终结果 |
任务串联 | 使用 Future 对象手动控制 |
内置方法,如 thenApply() |
异常处理 | 使用 try-catch 手动包裹 Future.get() 进行处理 |
使用内置的 exceptionally() 、whenComplete() 链式方法进行处理 |
超时控制 | 手动调用 Future.get(timeout) ,可能会中断任务 |
内置方法,如 completeOnTimeout() |
阻塞与非阻塞 | 阻塞(通常是通过 Future.get() 方法获取结果) |
非阻塞(链式任务不会阻塞主线程) |
9、总结
本文介绍了 Java 中处理异步任务的两个基本类:ExecutorService
和 CompletableFuture
。ExecutorService
简化了线程池和并发任务执行的管理,而 CompletableFuture
则为异步操作的组合和结果处理提供了更高层次的抽象。
本文还介绍了它们的功能、差异以及异常处理机制、超时管理和异步任务的串联方式。
Ref:https://www.baeldung.com/java-executorservice-vs-completablefuture