ForkJoinPool 的关闭

不积跬步,无以至千里。不积小流,无以成江海。

同 ThreadPoolExecutor 一样,ForkJoinPool 的关闭也不可能是”瞬时的”,而是需要一个平滑的过渡过程。

关键的 terminate 变量

对于一个 Worker 线程来说,它会在一个 while 循环里面不断轮询队列中的任务,如果有任务,那么执行,处在活跃状态;如果没有任务,则进入空闲等待状态。

(int)(c=ctl)<0,即低32位的最高位为1(参考前面 ctl 变量的解析),说明线程池已经进入了关闭状态。但线程池进入关闭状态,不代表所有的线程都会立马关闭。

为此,在 ForkJoinWorkerThread 里还有一个 terminate 变量,初始为 false。当线程池要关闭的时候,会把相关线程的 terminate 变量置为 true。这样,这些线程就会退出上面的 while 循环,也就会自动退出。

shutdown() 与 shutdownNow()

1
2
3
4
    public void shutdown() {
        checkPermission();
        tryTerminate(false, true);
    }
1
2
3
4
5
    public List<Runnable> shutdownNow() {
        checkPermission();
        tryTerminate(true, true);
        return Collections.emptyList();
    }

调用 tryTerminate(boolean) 函数,其中一个传入的是 false,另一个传入的是 true。tryTerminate 意为试图关闭 ForkJoinPool,并不保证一定可以关闭成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (this == common) {
            awaitQuiescence(timeout, unit);
            return false;
        }
        long nanos = unit.toNanos(timeout);
        if (isTerminated())
            return true;
        if (nanos <= 0L)
            return false;
        long deadline = System.nanoTime() + nanos;
        synchronized (this) {
            for (;;) {
                if (isTerminated())
                    return true;
                if (nanos <= 0L)
                    return false;
                long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
                wait(millis > 0L ? millis : 1L);
                nanos = deadline - System.nanoTime();
            }
        }
    }

同 ThreadPoolExecutor 一样,ForkJoinPool 中也有 awaitTermination() 函数,代码几乎相同,上面函数的最后一段,就是在整个线程池都已关闭,即没有任何线程存活的情况下,通知阻塞在 awaitTermination() 的外部线程。

在 startTerminating() 中,把全局队列、每个线程的局部队列中的任务都取消了,同时把所有线程的 terminate 置为了 true,唤醒了阻塞栈中所有等待的空闲线程(这些线程的 terminate 置为了 true,会自动退出)。

如果 now 为 false,tryTerminate() 会返回 false。只是在最外面的函数 shutdown() 里面,把 shutdown 置为了 true。这样,新的任务提交会被拒绝,但现有的任务都会正常执行完成。