将虚拟线程与 ScheduledExecutorService 结合使用

1、简介

虚拟线程是 JDK 21 中正式引入的一项杀手锏级别的功能,是提高应用程序性能、吞吐量的一种解决方案。

但是,JDK 没有内置的使用虚拟线程的任务调度器。因此,我们得自己编写使用虚拟线程运行的任务调度器

本文将带你了解如何使用 Thread.sleep() 方法和 ScheduledExecutorService 类为虚拟线程创建自定义调度器。

2、虚拟线程是什么?

JEP-444 中引入了虚拟线程,作为线程类的轻量级版本,提高了应用程序的并发性和吞吐量。

虚拟线程占用的空间比通常的操作系统线程(或平台线程)要少得多。因此,我们可以在应用程序中同时产生比平台线程更多的虚拟线程。毫无疑问,这增加了并发单元的最大数量,也提高了应用程序的吞吐量。

关键的一点是,虚拟线程并不比平台线程更快(任务的耗时不会有改变)。在我们的应用中,虚拟线程的数量只比平台线程多,这样它们就能执行更多的并行工作。

虚拟线程的成本很低,因此我们不需要使用资源池等技术来为数量有限的线程安排任务。相反,我们可以在现代计算机中几乎无限地生成虚拟线程,而不会出现内存问题。

最后,虚拟线程是动态的,而平台线程的大小是固定的。因此,虚拟线程比平台线程更适合小型任务,如简单的 HTTP 或数据库调用。

3、虚拟线程调度

如上所述,虚拟线程的一大优势是体积小、成本低。我们可以在一个简单的机器中有效地生成数百万个虚拟线程,而不会出现内存不足的错误。因此,像使用平台线程、网络或数据库连接等更昂贵的资源那样池化虚拟线程并没有太大意义。

如果使用线程池,就会产生另一种开销,即为池中可用的线程调度任务,这将更加复杂,速度也可能更慢。此外,Java 中的大多数线程池都受到平台线程数的限制,而平台线程数总是小于程序中可能存在的虚拟线程数。

因此,我们必须避免使用带有线程池 API(如 ForkJoinPoolThreadPoolExecutor)的虚拟线程。相反,我们应该始终为每个任务创建一个新的虚拟线程。

目前,Java 并没有提供使用虚拟线程进行调度的标准 API,就像我们使用其他并发 API(如 ScheduledExecutorServiceschedule() 方法)一样。因此,为了有效地让虚拟线程运行计划任务,我们需要编写自己的 Scheduler(调度器)。

3.1、使用 Thread.sleep() 调度虚拟线程

创建自定义 Scheduler 的最直接方法是使用 Thread.sleep() 方法,让程序在当前线程执行时等待:

static Future<?> schedule(Runnable task, int delay, TemporalUnit unit, ExecutorService executorService) {
    return executorService.submit(() -> {
        try {
            Thread.sleep(Duration.of(delay, unit));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        task.run();
    });
}

schedule() 方法接收要调度的任务、延迟和 ExecutorService。然后,我们使用 ExecutorServicesubmit() 启动任务。在 try 代码块中,我们通过调用 Thread.sleep(),让执行任务的线程等待所需的延迟时间。因此,线程在等待时可能会被中断,所以我们通过中断当前线程的执行来处理 InterruptedException

最后,在等待之后,调用 run() 执行任务。

要使用自定义 schedule() 方法调度虚拟线程,需要为虚拟线程传递一个 ExecutorService

ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

try (virtualThreadExecutor) {
    var taskResult = schedule(() -> 
      System.out.println("Running on a scheduled virtual thread!"), 5, ChronoUnit.SECONDS,
      virtualThreadExecutor);

    try {
        Thread.sleep(10 * 1000); // 睡眠 10 秒,等待任务结果
    } catch (InterruptedException e) {
        Thread.currentThread()
          .interrupt();
    }

    System.out.println(taskResult.get());
}

首先,实例化一个 ExecutorService,它会为我们提交的每个任务生成一个新的虚拟线程。然后,将 virtualThreadExecutor 变量封装在一个 try-with-resources 语句中,使 ExecutorService 保持打开状态,直到使用完毕。或者,在使用 ExecutorService 后,我们可以使用 shutdown() 适当地结束它。

我们调用 schedule() 在 5 秒后运行任务,然后等待 10 秒再尝试获取任务执行结果。

3.2、使用 SingleThreadExecutor 调度虚拟线程

也可以在虚拟线程执行器中为每个提交的任务实例化一个新的单线程调度器:

static Future<?> schedule(Runnable task, int delay, TimeUnit unit, ExecutorService executorService) {
    return executorService.submit(() -> {
        ScheduledExecutorService singleThreadScheduler = Executors.newSingleThreadScheduledExecutor();

        try (singleThreadScheduler) {
            singleThreadScheduler.schedule(task, delay, unit);
        }
    });
}

上述代码还使用作为参数传递的虚拟线程 ExecutorService 来提交任务。但现在,我们使用 newSingleThreadScheduledExecutor() 方法为每个任务实例化了一个单线程的 ScheduledExecutorService

然后,在 try-with-resources 代码块中,我们使用单线程执行器 schedule() 方法调度任务。该方法接受任务和延迟时间作为参数,并且不会像 sleep() 那样抛出检查过的 InterruptedException

最后,我们可以使用 schedule() 为虚拟线程执行器调度任务:

ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

try (virtualThreadExecutor) {
    var taskResult = schedule(() -> 
      System.out.println("Running on a scheduled virtual thread!"), 5, TimeUnit.SECONDS,
      virtualThreadExecutor);

    try {
        Thread.sleep(10 * 1000); // 睡眠 10 秒,等待任务结果
    } catch (InterruptedException e) {
        Thread.currentThread()
          .interrupt();
    }

    System.out.println(taskResult.get());
}

这与第 3.1 节中 schedule() 方法的用法类似,但这里我们传递的是 TimeUnit 而不是 ChronoUnit

3.3、使用 sleep() 调度任务与调度单线程执行器

sleep() 调度方法中,我们只是在有效运行任务之前调用一个方法来等待。因此,我们可以直接了解代码在做什么,也更容易调试。另一方面,使用每个任务的调度执行器服务依赖于库的调度程序代码,因此可能更难调试或排除故障。

此外,如果我们选择使用 sleep(),就只能调度任务在固定延迟后运行。相比之下,使用 ScheduledExecutorService,我们可以使用三种调度方法:schedule()scheduleAtFixedRate()scheduleWithFixedDelay()

ScheduledExecutorServiceschedule() 方法会添加延迟,就像 sleep() 方法一样。scheduleAtFixedRate()scheduleWithFixedDelay() 方法为调度添加了周期性,因此我们可以在固定大小的周期内重复执行任务。因此,我们在使用 ScheduledExecutorService 内置 Java 库调度任务时拥有了更大的灵活性。

4、总结

本文介绍了使用虚拟线程而非传统平台线程的一些优势,以及如何使用 Thread.sleep()ScheduledExecutorService 来调度任务在虚拟线程中运行。


Ref:https://www.baeldung.com/java-scheduledexecutorservice-virtual-threads