Java 和 Guava 中的线程池
1、概览
本文将带你了解 Java 中的线程池。首先介绍 Java 中的标准库,然后介绍 Google 的 Guava 库。
2、线程池
在 Java 中,线程被映射到系统级线程,而系统级线程是操作系统的资源。如果不加控制地创建线程,这些资源可能很快就会耗尽。
操作系统也会在线程之间进行上下文切换,以模拟并行处理。一个简单的观点是,创建的线程越多,每个线程实际工作的时间就越少。
线程池模式有助于在多线程应用中节省资源,并将并行性控制在某些预定义的范围内。
使用线程池时,我们将并发代码编写为并行任务的形式,并将它们提交给线程池实例执行。这个实例控制着多个可重复使用的线程来执行这些任务。
该模式允许我们控制应用创建的线程数量及其生命周期。还能调度任务的执行,并将接收到的任务保存在队列中。
3、Java 中的线程池
3.1、Executors、Executor 和 ExecutorService
Executors
工具类包含多个用于创建预配置线程池实例的方法。这些类是一个很好的起点。如果不需要进行任何自定义微调,就可以使用它们。
在 Java 中,我们使用 Executor
和 ExecutorService
接口来处理不同的线程池实现。通常,应该使代码与线程池的实际实现解耦,并在整个应用中使用这些接口。
3.1.1、Executor
Executor
接口有一个 execute
方法,用于提交实现了 Runnable
接口的实例以供执行。
来看一个快速示例,演示如何使用 Executors
API 获取由单线程池和无界队列支持的 Executor
实例,以便按顺序执行任务。
如下,运行一个任务,只需在屏幕上打印 “Hello World” 即可。我们以 lambda(Java 8 的一个特性)的形式提交任务,它被推断为 Runnable
:
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));
3.1.2、ExecutorService
ExecutorService
接口包含大量用于控制任务进度和管理服务终止的方法。使用该接口,我们可以提交任务以供执行,还可以使用返回的 Future
实例控制任务的执行。
现在,创建一个 ExecutorService
,提交一项任务,然后使用返回的 Future
的 get
方法等待提交的任务完成并返回结果值:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// 一些其他的操作
String result = future.get();
当然,在实际应用中,我们通常不会立即调用 future.get()
,而是推迟到真正需要计算值时再调用。
这里,我们重载了 submit
方法,它接受 Runnable
或 Callable
。这两个都是函数式接口,可以将它们作为 lambda 传递。
Runnable
接口不会抛出异常,也不会返回值。Callable
接口可能更方便,因为它允许抛出异常并返回值。
要让编译器 推断 lambda 为 Callable
类型,只需从 lambda 返回一个值即可。
3.2、ThreadPoolExecutor
ThreadPoolExecutor
是一个可扩展的线程池实现,具有许多参数和配置,可以用于进行精细调整。
这里介绍的主要配置参数是 corePoolSize
、maximumPoolSize
和 keepAliveTime
。
线程池包含一定数量的核心线程,这些线程始终保留在内部。它还包含一些额外的线程,这些线程可能会被创建,当它们不再需要时会被终止。
corePoolSize
参数是将被实例化并保留在池中的核心线程数量。当一个新任务到来时,如果所有核心线程都忙碌且内部队列已满,池可以增长到最大线程数(maximumPoolSize
)。
keepAliveTime
参数是允许非核心线程(超过 corePoolSize
的实例化线程)处于空闲状态的超时时间。默认情况下,ThreadPoolExecutor
会回收超时了的空闲的非核心线程。要对核心线程应用相同的回收策略,可以使用 allowCoreThreadTimeOut(true)
方法。
这些参数涵盖了广泛的用例,但大多数典型配置都在 Executors
静态方法中预定义了。
3.2.1、newFixedThreadPool
newFixedThreadPool
方法创建了一个 ThreadPoolExecutor
(线程池执行器),其 corePoolSize
和 maximumPoolSize
参数值相等,keepAliveTime
为零。这意味着该线程池中的线程数量始终保持不变:
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());
如上,我们实例化了一个线程数固定为 2 的 ThreadPoolExecutor
。 这意味着,如果同时运行的任务数总是小于或等于 2,它们就会立即被执行。否则,其中一些任务可能会被放入队列等待消费执行。
我们创建了三个 Callable
任务,通过 Sleep 1000 毫秒来模仿耗时的任务。前两个任务将立即运行,第三个任务会进入队列中等待。我们可以在提交任务后立即调用 getPoolSize()
和 getQueue().size()
方法来验证。
3.2.2、Executors.newCachedThreadPool()
可以使用 Executors.newCachedThreadPool()
方法创建另一个预配置的ThreadPoolExecutor
。该方法不接收线程数量。其 corePoolSize
设置为0,maximumPoolSize
设置为 Integer.MAX_VALUE
。最后,keepAliveTime
为 60 秒:
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());
这些参数值意味着缓存线程池可以无限增长,以容纳任意数量的提交任务。但当线程不再需要时,在空闲 60 秒后它们将被销毁。一个典型的场景是应用中有大量、短暂的临时性任务。
队列大小始终为零,因为内部使用的是 SynchronousQueue
实例。在 SynchronousQueue
中,插入和移除操作总是同时进行。因此,队列实际上从未包含任何任务。
3.2.3、Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor()
API 可创建另一种包含单线程的典型 ThreadPoolExecutor
。单线程 Executor
是创建事件循环的理想选择。corePoolSize
和 maximumPoolSize
参数均为 1,keepAliveTime
为 0。
下例中的任务将按顺序运行,因此任务完成后 counter
值将为 2:
AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
counter.set(1);
});
executor.submit(() -> {
counter.compareAndSet(1, 2);
});
此外,ThreadPoolExecutor
被装饰为不可变的包装器,因此创建后无法重新配置。注意,这也是无法将其转换为 ThreadPoolExecutor
的原因。
3.3、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
继承了 ThreadPoolExecutor
类,还实现了 ScheduledExecutorService
接口,并增加了几个方法:
schedule
方法允许我们在指定延迟后运行一次任务。scheduleAtFixedRate
方法允许我们在指定的初始延迟后运行任务,然后以一定的间隔重复运行它。period
参数是测量任务开始时间之间的时间,因此执行速率是固定的。scheduleWithFixedDelay
方法与scheduleAtFixedRate
类似,都是重复运行给定的任务,但指定的延迟时间是从上一个任务结束到下一个任务开始之间的时间。执行率可能会根据运行任何给定任务所需的时间而变化。
通常,我们使用 Executors.newScheduledThreadPool()
方法来创建一个具有指定 corePoolSize
、无限的 maximumPoolSize
和零 0 keepAliveTime
的 ScheduledThreadPoolExecutor
。
如下,调度任务在 500 毫秒后执行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);
下面的示例展示了如何在延迟 500 毫秒后运行任务,然后每 100 毫秒重复一次。调度任务后,我们使用 CountDownLatch
锁等待任务触发三次。最后使用 Future.cancel()
方法取消任务:
CountDownLatch lock = new CountDownLatch(3);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
System.out.println("Hello World");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);
3.4、ForkJoinPool
ForkJoinPool
是 Java 7 引入的 fork/join 框架的核心部分。它解决了在递归算法中生成多个任务的常见问题。使用简单的 ThreadPoolExecutor
,会很快就会耗尽线程,因为每个任务或子任务都需要自己的线程来运行。
在 fork/join 框架中,任何任务都可以产生(fork)若干子任务,并使用 join 方法等待它们完成。fork/join 框架的好处在于,它不会为每个任务或子任务创建一个新线程,而是执行 工作窃取取算法(指某个线程从其他队列里窃取任务来执行)。
来看一个使用 ForkJoinPool
遍历节点树并计算所有叶子值之和的简单示例。下面是由一个节点、一个 int 值和一组子节点组成的树的简单实现:
static class TreeNode {
int value;
Set<TreeNode> children;
TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}
现在,如果我们想并行求和树中的所有值,就需要实现 RecursiveTask<Integer>
任务接口。每个任务接收自己的节点,并将其值与其子节点的值之和相加。为了计算子节点值的总和,任务实现需要执行以下操作:
- 通过
Stream
迭代children
集合。 - 映射该
Stream
,为每个元素创建一个新的CountingTask
。 - 通过fork方式运行每个子任务。
- 通过在每个 fork 任务上调用
join
方法来收集结果。 - 使用
Collectors.summingInt
Collector
对结果求和。
public static class CountingTask extends RecursiveTask<Integer> {
private final TreeNode node;
public CountingTask(TreeNode node) {
this.node = node;
}
@Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork())
.collect(Collectors.summingInt(ForkJoinTask::join));
}
}
运行计算的代码非常简单:
TreeNode tree = new TreeNode(5,
new TreeNode(3), new TreeNode(2,
new TreeNode(2), new TreeNode(8)));
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
4、Guava 中的线程池实现
Guava 是一个流行的 Google 工具库。它具有许多有用的并发类,包括几个 ExecutorService
的实现。这些实现类不可直接实例化或子类化,创建它们的实例的唯一入口点是 MoreExecutors
工具类。
4.1、添加 Guava 依赖
在 maven pom.xml
文件中添加以下 Guava 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
其最新版本可以在 Maven 中央仓库 中找到。
4.2、DirectExecutor 和 DirectExecutorService
有时,我们希望根据某些条件在 当前线程或线程池 中运行任务。我们倾向于使用 Executor
接口,只需切换实现即可。
要实现一个在当前线程中运行任务的 Executor
或 ExecutorService
的并不难,但这仍然需要编写一些模板代码,方便的是,Guava 为我们提供了预定义的实例。
下面是一个在同一线程中执行任务的示例。提供的任务会 Slepp 500 毫秒,但它会阻塞当前线程,执行调用完成后,结果立即可用:
Executor executor = MoreExecutors.directExecutor();
AtomicBoolean executed = new AtomicBoolean();
executor.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
executed.set(true);
});
assertTrue(executed.get());
directExecutor()
方法返回的实例实际上是一个静态单例,因此使用该方法创建对象时不会产生任何开销。
还有一个类似的方法是 MoreExecutors.newDirectExecutorService()
,但不推荐使用,因为该 API 会在每次调用时创建一个完整的 ExecutorService
实现。
4.3、ExitingExecutorService
另一个常见问题是在线程池仍在运行任务时关闭虚拟机(JVM)。即使有取消机制,也不能保证任务会乖乖地在执行器服务关闭时停止工作。这可能会导致 JVM 无限期挂起,而任务仍在继续工作。
为了解决这个问题,Guava 引入了一系列 Exiting Executor Service。它们基于守护线程,随着 JVM 一起终止。
这些 Service 还会使用 Runtime.getRuntime().addShutdownHook()
方法添加关闭钩子,并在 JVM 关闭前等待一段时间,直到任务超时。
如下例,我们提交了包含死限循环的任务,但我们使用的是 Exiting Executor Service,配置 JVM 终止时等待任务超时的时间为 100 毫秒:
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(executor,
100, TimeUnit.MILLISECONDS);
executorService.submit(() -> {
while (true) {
}
});
如果没有 exitingExecutorService
,这个任务将导致 JVM 无限期挂起。
4.4、监听器模式
监听器模式允许我们封装 ExecutorService
,并在任务提交时接收 ListenableFuture
实例,而不是简单的 Future
实例。ListenableFuture
接口继承了 Future
,并多了一个 addListener
方法。该方法允许添加一个 Listener
,在 Future
完成时调用该 Listener
。
我们很少会直接使用 ListenableFuture.addListener()
方法。但它对 Futures
工具类中的大多数工具方法都至关重要。
例如,通过 Futures.allAsList()
方法,可以将多个 ListenableFuture
实例合并为一个 ListenableFuture
,并在所有的 Future
成功完成时完成:
ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService =
MoreExecutors.listeningDecorator(executorService);
ListenableFuture<String> future1 =
listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 =
listeningExecutorService.submit(() -> "World");
String greeting = Futures.allAsList(future1, future2).get()
.stream()
.collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);
5、总结
本文介绍了线程池模式及其在标准 Java 库和 Google Guava 库中的实现。
Ref:https://www.baeldung.com/thread-pool-java-and-guava