要塞攻击免广告版
210.79MB · 2025-10-31
由JEP444 发起,在JDK 21中正式发布。相比平台线程更加轻量级,在某些场景下可以显著提供程序的吞吐量,API使用也十分的简单,不需要太多的学习成本。
常见用法
public static void main(String[] args) {
    Thread.ofVirtual().start(() -> {
        System.out.println(Thread.currentThread());
    }).join();
    Thread.startVirtualThread(() -> {
        System.out.println(Thread.currentThread());
    }); //.setName("virtual thread");
    ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
    executorService.submit(() -> {});
}  
VirtualThread类从Thread继承而来,Thread 支持的功能虚拟线程同样支持,可以无缝切换Thread 到虚拟线程
虚拟线程执行流程:
/**
 * --add-exports java.base/jdk.internal.vm=ALL-UNNAMED
 * @date 2025/9/3
 */
public class ContinuationTest2 {
    public static void main(String[] args) {
        ContinuationScope scope = new ContinuationScope("scope");
        Continuation continuation = new Continuation(scope, () -> {
            System.out.println("continuation start execute...");
            Continuation.yield(scope);    // 暂停执行continuation
            System.out.println("continuation end execute...");
        });
        continuation.run();  // 开始执行continuation
        System.out.println(continuation.isDone());    // false: 由于continuation内部执行了yield
        continuation.run();  // 继续执行continuation
    }
}
输出如下:
continuation start execute...
false
continuation end execute...
前面说了虚拟现在在执行continuation的时候,遇到阻塞的时候会执行yield,完成后Carrier线程会继续去处理其他任务。 但是在某些情况下执行yield 并不能立即结束当前的任务,导致该Carrier线程一直阻塞,这种情况就叫Pinning
下面情况会发生Pinning:
当虚拟线程等待synchronized 块时,不会发生pinning
pinning 检查参数: -Djdk.tracePinnedThreads=short/full. 如果发生pin,将会输出: <== monitors 由于这个参数内部实现和jvmti交互有bug, 因此在debug场景下会卡死。
// -Djdk.tracePinnedThreads=short
public class Main {
    public static void main(String[] args)  {
        Thread.startVirtualThread(() -> {
            synchronized (Main.class) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("finish");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }); //.setName("virtual thread");
        LockSupport.park();
    }
}
输出:
Thread[#31,ForkJoinPool-1-worker-1,5,CarrierThreads]
    org.example.Main.lambda$main$0(Main.java:11) <== monitors:1
finish
JEP491: 解决Synchronize 情况下发生pinning, 参数tracePinnedThreads将无用。 由JDK24 中发布,并没有完全解决pinning.
在执行过程可以使用下面命令可以输出线程信息: jcmd Thread.dump_to_file -format=json 等价于下面代码: new HotSpotDiagnostic().dumpThreads("C:projectfeature23test.json", HotSpotDiagnosticMXBean.ThreadDumpFormat.JSON);
输出如下:每个container下面输出对应的线程信息
内部主要由ThreadContainer 对象用跟踪线程信息,有虚拟线程数量统计、线程跟踪等。在结构化并发中比较有用。 ThreadContainer继承关系:
开启虚拟线程前会设置默认container 为rootContainers --> ThreadContainers.root(): 每个Thread对象都有一个container属性, 即使new Thread 没有设置container,默认也为:
根据jdk.trackAllThreads 属性决定是用TrackingRootContainer(default),还是CountingRootContainer。 name 都叫
除了上面的Container外,还有SharedThreadContainer(不继承RootContainer), 用于非结构化使用,例如线程池
手动创建虚拟线程container构造过程: start(ThreadContainers.root()): 虚拟线程执行过程
ScopeValue:jep481 线程间不可变,比threadlocal更加高效,且内存更低。 主要用于虚拟线程、结构化编程 jep480 threadLocal: 数据可变、易内存泄露、当子线程过多,继承父线程的inheritableThreadLocals可能导致内存占用较大
StructuredTaskScope: 子线程可以继承父线程的Scope
Snapshot: 快照,相当于threadlocal 相关记录位置: Thread#scopedValueBindings: 默认Thread.class, Snapshot (包含previous指针) 对象 Thread#headStackableScopes: 指向线程栈顶元素。 previous 记录上一个StackableScope。 貌似StructuredTaskScope才使用这个东西 Thread#scopedValueCache: native 方法
ScopeValue 存放数据的实际上是Snapshot对象,Snapshot 持有bindings 值 Snapshot#bitmask: Carrier#bitmask:
ThreadContainerImpl 才是传的false,非共享,有owner, 会继承scoprevalue
ScopeValue 在线程间共享必须不可变。
ShutdownOnSuccess: 其中一个任务完成后,即终止其他任务 ShutdownOnFailure: 有一个失败后,终止其他任务
Thread flocks: 线程群, 管理线程start, close、 会使用一个tree 结构
StructuredTaskScope# ThreadFlock
#scopedValueBindings: 记录当前线程的scopedValue #container: ThreadContainerImpl (StackableScope子类)实例对象, 会存放当前线程作为owner, StackableScope#previous 记录当前线程的栈顶元素headStackableScopes
scope.fork:
scope.join: 等待任务完成。
var scopedValue = ScopedValue.newInstance();
ScopedValue.runWhere(scopedValue, "duke", () -> {
            try (var scope = new StructuredTaskScope<String>()) {
                scope.fork(() -> childTask());
                scope.fork(() -> childTask2());
                scope.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
参考: juejin.cn/post/696423…
demo 演示:
Server:
 public static void main(String[] args) throws Throwable {
        HttpServer httpServer = HttpServer.create(new InetSocketAddress(8080), 0);
        httpServer.createContext("/test", t -> {
            try {
                TimeUnit.MINUTES.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            Headers responseHeaders = t.getResponseHeaders();
            responseHeaders.add("content-type", "text/plain");
            String response = "This is the response";
            t.sendResponseHeaders(200, response.length());
            OutputStream os = t.getResponseBody();
            os.write(response.getBytes());
            os.close();
        });
        httpServer.start();
        LockSupport.park();
    }
    static record URLData (URL url, byte[] response) { }
    static List<URLData> retrieveURLs(URL... urls) throws Exception {
        //创建虚拟线程线程池
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            //生成读取对每个 url 执行 getURL 方法的任务
            var tasks = Arrays.stream(urls)
                    .map(url -> (Callable<URLData>)() -> getURL(url))
                    .toList();
            //提交任务,等待并返回所有结果
            return executor.invokeAll(tasks).stream()
                    .filter(Future::isDone)
                    .map(f -> {
                        try {
                            return f.get();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        } catch (ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .toList();
        }
    }
    //读取url的内容
    static URLData getURL(URL url) throws IOException {
        try (InputStream in = url.openStream()) {
            return new URLData(url, in.readAllBytes());
        }
    }
    public static void main(String[] args) throws Exception {
        long pid = ProcessHandle.current().pid();
        System.out.println("进程 ID (PID): " + pid);
        List<URLData> urlData = retrieveURLs(new URL("http://localh*os*t*:8080/test"));
        System.out.println(urlData);
        for (URLData urlDatum : urlData) {
            System.out.println(new String(urlDatum.response));
        }
    }
在阻塞过程中可以通过JCMD命令dump 出堆栈信息。 IDEA中自带的dump 工具无法dump出虚拟线程的堆栈信息。
默认会在进程目录生成文件
jcmd 4632 Thread.dump_to_file threads.txt -overwrite
可以看到虚拟线程阻塞的堆栈信息:
#30 "" virtual
      java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
      java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:369)
      java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:139)
      java.base/sun.nio.ch.Poller.poll(Poller.java:102)
      java.base/sun.nio.ch.Poller.poll(Poller.java:87)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:201)
      java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
      java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
      java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
      java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099)
Poller: 一个Poller相当于一个selector。不同平台实现不同。win为WEPoll,linux 则epoll, mac 则 kqueue。 下面都是win的分析:
map: <fd, thread>. 处理park唤醒线程
queue:需要异步跟踪的请求对象 (Request:fd、waiter、done。 可以有一个等待完成的线程)
static 中会初始化readPoller,writePoller。 每个Poller 启动的时候会相应的启动一个Updater(默认情况)
jdk.readPollers 属性可以定义poller 个数,默认1. 请求采用fd % len 的方法来选择poller
readPoller: 监听EPOLLIN 事件, writePoller:监听 EPOLLOUT
Updater 从 queue 中获取对象, 将fd 注册到selector, 监听 EPOLLONESHOT(触发方式: report event(s) only once)
连接请求
java.net.Socket#connect(java.net.SocketAddress, int)
读取响应数据: sun.nio.ch.NioSocketImpl#implRead --> timedRead --> park
tryRead的时候返回-2 (IOStatus: Nothing available (non-blocking)), 就会执行park 逻辑:
sun.nio.ch.NioSocketImpl#park(java.io.FileDescriptor, int, long): 虚拟线程执行Poller#poll。 非虚拟线程执行默认的Net#poll,由底层平台实现。
 
                    