当前位置: 首页 > article >正文

Java 21 虚拟线程详解

Java 21 引入了虚拟线程这一重磅特性——长期以来开发者们一直期待的“Java 协程”。在高并发和 I/O 密集型应用中,虚拟线程不仅大幅降低内存占用,还能实现高效的上下文切换,从而显著提升系统性能和吞吐量。


1. 什么是虚拟线程

虚拟线程是 JDK 21 推出的一种轻量级线程,其核心优势在于:

  • 内存占用低:无需为每个虚拟线程分配一个独立的操作系统线程,降低了系统资源消耗。
  • 高效上下文切换:由 JVM 内部调度管理,不涉及昂贵的 OS 级别线程切换,能更好地应对高并发场景。

简而言之,虚拟线程可看作是“任务(Task)”,它们运行在传统的操作系统线程之上,但在代码层面与常规线程使用方式完全一致。


2. 虚拟线程的工作原理

当应用程序启动一个虚拟线程时,JVM 会将其交由底层的线程池(由传统线程构成)执行。其核心工作机制包括:

  • 任务调度:一个传统线程可以轮流执行多个虚拟线程。举例来说,假设系统创建了 1000 个虚拟线程,而底层线程池只有 10 个传统线程,那么:

    • 初始时,V1 到 V10 分别调度到 T1 到 T10 上执行。
    • 当虚拟线程(例如 V3)因 I/O 操作而阻塞时,T3 立即释放出来去执行等待中的虚拟线程(如 V11)。
    • 如果所有虚拟线程均处于非阻塞状态,JVM 会按照时间片(例如每 100ns)轮转调度,将部分虚拟线程挂起以让新任务得以执行。
    • 如果以上条件均不满足,新任务将挂起等待空闲传统线程。
  • 阻塞处理:当虚拟线程遇到阻塞(如 I/O 操作)时,JVM 会立刻挂起该虚拟线程,并通过操作系统事件(如 epoll)通知 I/O 完成,从而在合适时机重新唤醒该虚拟线程。

这套机制确保了即使在大量 I/O 阻塞场景下,系统也不会因为传统线程资源不足而性能急剧下降。


3. 虚拟线程的调度

  • 自动调度:JDK 21 默认启用虚拟线程,且调度由 JVM 管理。默认情况下,JVM 会利用 ForkJoinPool 来执行虚拟线程,并根据实际任务数动态调整底层线程数。
  • 自定义调度:虽然大部分场景下无需手动干预,但若有特殊需求(例如控制并发量、定制线程池参数等),可自定义线程池并将虚拟线程交给该线程池执行。

4. 虚拟线程与传统线程的区别

两者的主要差异包括:

  • 线程创建方式

    • 虚拟线程:不直接创建操作系统线程,运行时由传统线程池调度。
    • 传统线程:每创建一个线程,JVM 都会启动一个独立的操作系统线程。
  • 资源消耗

    • 虚拟线程:内存开销极小,可轻松创建百万级虚拟线程。
    • 传统线程:资源开销较大,一般只能支持几千个线程。
  • 上下文切换

    • 虚拟线程:上下文切换由 JVM 管理,开销低。
    • 传统线程:依赖 OS 级调度,切换开销较高。
  • 调度与执行

    • 虚拟线程:任务调度完全由 JVM 控制,遇到阻塞时只挂起任务,不会占用底层线程。
    • 传统线程:阻塞操作会直接占用线程,影响线程池整体吞吐。

5. 虚拟线程与协程的对比

虽然虚拟线程与 Python 等语言中的协程在处理 I/O 阻塞时有相似的“让步”机制,但二者存在显著区别:

对比维度虚拟线程协程
并发/并行可在多个 CPU 上并行运行,支持真正的并行执行只有单个主线程调度,同一时刻只处理一个任务
资源争夺存在资源竞争和状态同步问题,需要合理设计并发控制单线程执行,无并发资源争夺问题
框架支持JDK 21 原生支持,无需额外框架依赖专用异步框架,编写及调试相对复杂

6. 如何使用虚拟线程

在 JDK 21 中,使用虚拟线程主要有两种方式:

6.1 直接创建并启动虚拟线程

public class VirtualThreadExample {
    public static void main(String[] args) {
        Thread virtualThread = Thread.ofVirtual().start(() -> {
            System.out.println("Hello virtual thread");
        });
        try {
            virtualThread.join(); // 等待虚拟线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

6.2 通过线程池执行虚拟线程

import java.util.concurrent.*;

public class VirtualThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个虚拟线程池
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        // 提交多个任务到线程池
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " running in " + Thread.currentThread());
            });
        }
        // 关闭线程池
        executor.shutdown();
    }
}

注意:虚拟线程池不支持设置核心线程数、最大线程数或任务队列等参数。如果需要对并发量进行严格控制,可以自定义线程池。


7. 自定义虚拟线程池示例

为了避免因无限制并发导致 OOM 或对下游系统产生巨大压力,可借助信号量(Semaphore)和阻塞队列实现自定义虚拟线程池。下面是一个示例:

package com.zengbiaobiao.demo.vitrualthreaddemo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class VirtualThreadExecutorService extends AbstractExecutorService {
    private volatile boolean shouldStop = false;
    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private final Semaphore semaphore;
    private final BlockingQueue<Runnable> taskQueue;

    /**
     * 构造函数
     * @param taskQueueSize 任务队列大小(队列满时 execute 会阻塞)
     * @param concurrencySize 最大并发任务数,防止过高并发或资源不足
     */
    public VirtualThreadExecutorService(int taskQueueSize, int concurrencySize) {
        this.semaphore = new Semaphore(concurrencySize);
        taskQueue = new LinkedBlockingQueue<>(taskQueueSize);
        this.loopEvent();
    }

    private void loopEvent() {
        Thread.ofVirtual().name("VirtualThreadExecutor").start(() -> {
            while (!shouldStop) {
                try {
                    Runnable task = taskQueue.take();
                    semaphore.acquire();
                    executor.execute(() -> {
                        try {
                            try {
                                task.run();
                            } finally {
                                semaphore.release();
                            }
                        } catch (Exception e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        }
                    });
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    if (shouldStop) break;
                }
            }
        });
    }

    @Override
    public void shutdown() {
        shouldStop = true;
        executor.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        shouldStop = true;
        List<Runnable> remainingTasks = new ArrayList<>(taskQueue);
        taskQueue.clear();
        executor.shutdownNow();
        return remainingTasks;
    }

    @Override
    public boolean isShutdown() {
        return shouldStop;
    }

    @Override
    public boolean isTerminated() {
        return shouldStop && executor.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return executor.awaitTermination(timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        try {
            taskQueue.put(command); // 阻塞直到队列有空间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Task submission interrupted.", e);
        }
    }
}

测试代码示例:

package com.zengbiaobiao.demo.vitrualthreaddemo;

public class VirtualThreadExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException {
        VirtualThreadExecutorService executorService = new VirtualThreadExecutorService(10, 2);
        for (int i = 0; i < 100000; i++) {
            final String threadName = "thread-" + i;
            System.out.println(Thread.currentThread() + ": try to create task " + threadName);
            executorService.submit(() -> {
                System.out.println(Thread.currentThread() + ": " + threadName + " created!");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread() + ": " + threadName + " finished!");
            });
        }
        Thread.sleep(5000000);
    }
}

8. 虚拟线程的应用场景

适用场景

  • 短时任务:如数据库写入、HTTP 请求、远程 REST API 调用、消息队列消费等,对资源竞争要求不高的场景。
  • 长时任务(有顺序要求):例如电梯监控系统中,每台电梯可分配专属虚拟线程以保证数据顺序处理。
  • API 网关数据聚合:并发查询多个上游 API 并进行数据组装,虚拟线程相比传统线程更具优势。
  • 事件驱动架构:如 Spring Boot 异步事件处理,改用虚拟线程池可大幅提升并发处理能力。

不适用场景

  • CPU 密集型应用:如大数据计算、图像处理、矩阵运算等。
  • 高资源竞争或状态同步密集型场景:此时无论使用传统线程还是虚拟线程,都需要针对性优化并发模型。

9. Spring Boot 中的实际应用示例

在 Spring Boot 项目中,异步事件处理可能成为性能瓶颈。借助虚拟线程,可以实现更高的并发处理能力。示例配置如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        // 最大并行任务数控制
        Semaphore semaphore = new Semaphore(100);
        ExecutorService virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();
        return runnable -> {
            try {
                semaphore.acquire();
                virtualThreadPool.submit(() -> {
                    try {
                        runnable.run();
                    } finally {
                        semaphore.release();
                    }
                });
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Task submission interrupted", e);
            }
        };
    }
}

事件发送和处理示例:

package com.zengbiaobiao.demo.vitrualthreaddemo;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/home")
public class HomeController {
    private final ApplicationEventPublisher eventPublisher;

    public HomeController(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @GetMapping("/index")
    public String index() {
        for (int i = 0; i < 1000; i++) {
            eventPublisher.publishEvent("event " + i);
        }
        return "success";
    }

    @EventListener
    @Async
    public void handleEvent(String event) {
        System.out.println(Thread.currentThread() + ": " + event);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

输出示例:

VirtualThread[#2031]/runnable@ForkJoinPool-1-worker-4: event 976  
VirtualThread[#2039]/runnable@ForkJoinPool-1-worker-1: event 980  
VirtualThread[#1064]/runnable@ForkJoinPool-1-worker-1: event 983  
...

上述示例中,当虚拟线程池中同时运行任务达到设定的 100 个时,后续任务提交将阻塞,待当前任务完成后再继续执行,从而实现对并发量的有效控制。


10. 使用虚拟线程的注意事项

  • 任务性质:首先明确任务是 I/O 密集型还是 CPU 密集型。
  • 与传统线程配合:在混合使用场景下,注意两者的切换与资源竞争。
  • 性能监控与限流:虚拟线程本身无法通过线程池参数控制并发,需借助 Semaphore、CountDownLatch 等工具防止 OOM 或对下游服务造成冲击。
  • 隐藏的传统线程:例如 HttpClient 的异步调用可能会额外创建传统线程,需关注整体线程消耗。
  • 同步机制:使用 synchronized 可能会导致线程“绑定”(pinned)到某个 OS 线程,建议对比使用 ReentrantLock 等更灵活的同步工具。
  • 软件设计:高并发场景下,合理的系统设计和并发模型优化仍然是性能保障的关键。

总结:虚拟线程简化了异步编程,使开发者能够以同步风格编写高并发代码,但在滥用高并发时依然可能导致资源耗尽或性能瓶颈,因此务必在实际应用前做好充分的测试和验证。


11. 已知问题与参考资料

在实际使用过程中,虚拟线程仍可能遇到一些早期的“陷阱”(gotchas),例如线程绑定(pinning)等问题。以下文章提供了一些实际案例和优化建议:

  • Java Virtual Threads — some early gotchas to look out for
  • Two Pitfalls by moving to Java Virtual Threads
  • Java 21 Virtual Threads - Dude, Where’s My Lock?
  • Pitfalls to avoid when switching to Virtual threads
  • Do Java 21 virtual threads address the main reason to switch to reactive single-thread frameworks?
  • Pinning: A pitfall to avoid when using virtual threads in Java
  • Taming the Virtual Threads: Embracing Concurrency With Pitfall Avoidance
  • Pitfalls you encounter with virtual threads

http://www.kler.cn/a/536314.html

相关文章:

  • Docker安装pypiserver私服
  • Maven 依赖管理全面解析
  • C32.【C++ Cont】静态实现双向链表及STL库的list
  • 二叉树实现(学习记录)
  • 组合(力扣77)
  • 神经网络常见激活函数 1-sigmoid函数
  • 【C#】一维、二维、三维数组的使用
  • 测试中的第一性原理:回归本质的质量思维革命
  • 数据结构之顺序表和链表
  • s1:简单测试-时间规模化
  • Kotlin 使用虚拟线程并在低版本中自动切换到协程
  • zabbix v7.2.3容器运行Proxy代理服务器启用IPV6访问
  • 代码随想录算法【Day38】
  • SQL Server查询计划操作符(7.3)——查询计划相关操作符(6)
  • 第4节课:控制结构 - 条件语句、循环语句
  • 本地私有化部署 DeepSeek Dify ,告别“服务器繁忙,请稍后再试”
  • 小米官博宣布:首款AI眼镜即将发布
  • Java实现网络安全编程数字信封 网络安全 java
  • 深入解析:如何利用 Python 爬虫获取商品 SKU 详细信息
  • 深入理解 YUV Planar 和色度二次采样 —— 视频处理的核心技术
  • 第30节课:前端架构与设计模式—构建高效可维护的Web应用
  • 《金字塔原理》笔记
  • 【JS】element-ui 中 table的select事件
  • source 与 shell 之详解(Detailed Explanation of Source and Shell)
  • 集合类不安全问题
  • tqdm用法教程