0

0

Java虚拟线程与RecursiveAction/Task:兼容性与替代方案

花韻仙語

花韻仙語

发布时间:2025-10-11 11:10:18

|

198人浏览过

|

来源于php中文网

原创

Java虚拟线程与RecursiveAction/Task:兼容性与替代方案

本文深入探讨了Java中RecursiveAction和RecursiveTask与虚拟线程的兼容性问题。由于它们与ForkJoinPool的固有绑定,无法直接使用虚拟线程。文章继而提出了基于CompletableFuture和StructuredTaskScope(孵化中)的替代方案,演示了如何利用虚拟线程的轻量级特性,高效实现递归任务的并行处理,并提供优化策略。

1. RecursiveAction/Task 与虚拟线程的兼容性分析

java中,recursiveaction和recursivetask是为forkjoinpool设计的抽象基类,用于支持分治算法的并行执行。它们是forkjointask的子类,顾名思义,其设计目标就是运行在forkjoinpool内部。

ForkJoinPool在创建工作线程时,会使用一个特殊的ForkJoinPool.ForkJoinWorkerThreadFactory来生成ForkJoinWorkerThread实例。这些工作线程是Thread类的子类,而非虚拟线程。虚拟线程(Virtual Threads)需要通过Thread.Builder.OfVirtual或Executors.newVirtualThreadPerTaskExecutor()等方式创建。由于ForkJoinWorkerThreadFactory无法创建虚拟线程,因此RecursiveAction和RecursiveTask无法直接与虚拟线程结合使用。

简而言之,RecursiveAction和RecursiveTask的设计理念和实现机制与ForkJoinPool紧密耦合,而ForkJoinPool目前并不支持将虚拟线程作为其工作线程。

2. 重新思考:虚拟线程下的递归任务设计

当考虑将任务分解为子任务并在虚拟线程上执行时,我们应该重新审视RecursiveAction/RecursiveTask所提供的价值。这些类主要为ForkJoinPool提供工作窃取(work-stealing)等机制,以在有限的平台线程上高效平衡工作负载。然而,当每个子任务都可以运行在自己的轻量级虚拟线程上时,这些复杂的负载均衡机制的需求就大大降低了。虚拟线程的优势在于其数量庞大且创建成本极低,这使得“为每个子任务分配一个虚拟线程”成为一种可行的策略。

因此,即使不能直接使用RecursiveAction/RecursiveTask,我们仍然可以轻松地使用其他并发工具在虚拟线程上实现递归任务。

立即学习Java免费学习笔记(深入)”;

3. 基于 CompletableFuture 的自定义递归任务实现

CompletableFuture是Java中处理异步操作的强大工具,它与虚拟线程结合可以非常优雅地实现递归任务。以下是一个示例,展示了如何创建一个类似RecursiveTask的结构,但在虚拟线程上执行:

Pictory
Pictory

AI视频制作工具,可以通过长内容中制作简短视频

下载
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

public class VirtualThreadRecursiveTask {

    // 示例1:基本递归任务,每个子任务启动一个虚拟线程
    record PseudoTask(int from, int to) {
        public static CompletableFuture run(int from, int to) {
            // 使用CompletableFuture.runAsync在虚拟线程上执行任务
            return CompletableFuture.runAsync(
                new PseudoTask(from, to)::compute, Thread::startVirtualThread);
        }

        protected void compute() {
            int mid = (from + to) >>> 1; // 计算中间点
            if (mid == from) {
                // 模拟实际处理,可能包含阻塞操作
                System.out.println(Thread.currentThread().getName() + ": Processing range [" + from + ", " + to + "]");
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); // 模拟耗时操作
            } else {
                // 递归地创建并运行子任务
                CompletableFuture sub1 = run(from, mid);
                CompletableFuture sub2 = run(mid, to);
                sub1.join(); // 等待子任务完成
                sub2.join(); // 等待子任务完成
            }
        }
    }

    // 示例2:优化后的递归任务,减少虚拟线程创建数量
    record OptimizedPseudoTask(int from, int to) {
        public static CompletableFuture run(int from, int to) {
            return CompletableFuture.runAsync(
                new OptimizedPseudoTask(from, to)::compute, Thread::startVirtualThread);
        }

        protected void compute() {
            CompletableFuture pendingFutures = null;
            // 循环处理一部分任务,另一部分提交给新线程
            for (int currentFrom = this.from; ; ) {
                int mid = (currentFrom + to) >>> 1;
                if (mid == currentFrom) {
                    // 模拟实际处理
                    System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
                    break;
                } else {
                    // 提交一个子任务到新的虚拟线程
                    CompletableFuture sub = run(currentFrom, mid);
                    if (pendingFutures == null) {
                        pendingFutures = sub;
                    } else {
                        pendingFutures = CompletableFuture.allOf(pendingFutures, sub);
                    }
                    currentFrom = mid; // 当前线程处理另一半
                }
            }
            if (pendingFutures != null) {
                pendingFutures.join(); // 等待所有提交的子任务完成
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("--- Running Basic PseudoTask ---");
        long startTime = System.currentTimeMillis();
        PseudoTask.run(0, 1_000).join(); // 执行任务并等待完成
        long endTime = System.currentTimeMillis();
        System.out.println("Basic PseudoTask completed in " + (endTime - startTime) + " ms");

        System.out.println("\n--- Running Optimized PseudoTask ---");
        startTime = System.currentTimeMillis();
        OptimizedPseudoTask.run(0, 1_000_000).join(); // 执行优化后的任务
        endTime = System.currentTimeMillis();
        System.out.println("Optimized PseudoTask completed in " + (endTime - startTime) + " ms");
    }
}

代码解析与注意事项:

  1. CompletableFuture.runAsync(task, Thread::startVirtualThread): 这是核心。runAsync方法接受一个Runnable和一个Executor。通过提供Thread::startVirtualThread作为Executor,我们指示CompletableFuture在每次创建新任务时都启动一个新的虚拟线程来执行它。
  2. join()方法的阻塞性: 在上述示例中,sub1.join()和sub2.join()是阻塞调用。在平台线程中,这种阻塞会导致线程池饥饿。但在虚拟线程中,join()的阻塞并不会阻塞底层平台线程,因为虚拟线程在等待时会被卸载,允许平台线程执行其他虚拟线程。因此,即使存在阻塞join(),性能影响也远小于传统线程。
  3. 虚拟线程数量: 第一个PseudoTask示例会为每个子任务都创建一个新的虚拟线程。对于run(0, 1_000)这样的范围,可能会创建接近2000个虚拟线程。虽然虚拟线程非常轻量,但创建过多仍然会带来一些开销。
  4. 优化策略: 第二个OptimizedPseudoTask示例展示了一种优化方法:当前线程处理一部分任务(通过循环迭代),而只将另一部分任务提交到新的虚拟线程。这种策略可以显著减少虚拟线程的创建数量。例如,对于run(0, 1_000_000),它可能只会创建大约100万个虚拟线程,而不是200万个,这在某些场景下可以带来性能提升。这种优化与ForkJoinPool中工作窃取机制的某些思想有异曲同工之处,但这里是为了管理虚拟线程的创建数量,而非平台线程的负载。

4. 未来的选择:StructuredTaskScope

Java的孵化模块中引入了StructuredTaskScope,它提供了一种更结构化的并发编程模型,尤其适用于虚拟线程。StructuredTaskScope允许在一个明确的父子关系中管理并发任务,确保所有子任务在父任务完成前结束,并能更好地处理错误和取消。

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class StructuredVirtualThreadTask {

    // 示例:使用StructuredTaskScope实现递归任务
    record PseudoTask(int from, int to) {
        public static void run(int from, int to) {
            // 使用ShutdownOnFailure策略,任何子任务失败都会关闭整个作用域
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                new PseudoTask(from, to).compute(scope);
                scope.join(); // 等待所有子任务完成
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Task interrupted", e);
            }
        }

        protected Void compute(StructuredTaskScope scope) {
            for (int currentFrom = this.from; ; ) {
                int mid = (currentFrom + to) >>> 1;
                if (mid == currentFrom) {
                    // 模拟实际处理
                    System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
                    break;
                } else {
                    // 使用scope.fork()在虚拟线程中启动子任务
                    var sub = new PseudoTask(currentFrom, mid);
                    scope.fork(() -> sub.compute(scope));
                    currentFrom = mid; // 当前线程处理另一半
                }
            }
            return null;
        }
    }

    public static void main(String[] args) {
        System.out.println("--- Running StructuredTaskScope PseudoTask ---");
        long startTime = System.currentTimeMillis();
        PseudoTask.run(0, 1_000_000); // 执行任务
        long endTime = System.currentTimeMillis();
        System.out.println("StructuredTaskScope PseudoTask completed in " + (endTime - startTime) + " ms");
    }
}

代码解析与注意事项:

  1. StructuredTaskScope: 这是一个孵化中的API,使用时需要特定的JVM参数(如--enable-preview和--add-modules jdk.incubator.concurrent)。
  2. ShutdownOnFailure: 这是一个StructuredTaskScope的实现,表示如果任何一个子任务失败,整个作用域都会被关闭,所有其他正在运行的子任务都会被取消。
  3. scope.fork(() -> sub.compute(scope)): fork方法在作用域内启动一个新的虚拟线程来执行提供的任务。
  4. scope.join(): 父任务通过join()方法等待作用域内所有子任务完成。与CompletableFuture.join()不同,这里是在作用域级别进行等待,而不是单个子任务。
  5. 结构化并发: StructuredTaskScope提供了更强的结构保证,例如,子任务的生命周期被限定在父任务的生命周期内,这有助于避免资源泄露和更好地管理错误。
  6. 孵化状态: 由于StructuredTaskScope目前仍处于孵化阶段,其API和行为可能会在未来的Java版本中发生变化,不建议在生产环境中使用。然而,它代表了Java并发模型的一个重要发展方向。

总结

尽管RecursiveAction和RecursiveTask无法直接与虚拟线程配合使用,但这并不意味着我们无法在虚拟线程上实现高效的递归任务。通过利用CompletableFuture的异步特性,并结合Thread::startVirtualThread来创建虚拟线程,我们可以轻松构建自定义的递归任务处理机制。此外,StructuredTaskScope作为一项新兴的结构化并发特性,为未来在虚拟线程上管理复杂任务提供了更优雅、更健壮的解决方案。在设计虚拟线程应用程序时,理解这些替代方案及其优势,将有助于我们充分利用虚拟线程的强大功能。

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

827

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

731

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

732

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

396

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

398

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

430

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16904

2023.08.03

漫蛙2入口地址合集
漫蛙2入口地址合集

本专题整合了漫蛙2入口汇总,阅读专题下面的文章了解更多详细内容。

13

2026.01.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.3万人学习

C# 教程
C# 教程

共94课时 | 6.1万人学习

Java 教程
Java 教程

共578课时 | 42.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号