Fork Join 简介

不积跬步,无以至千里。不积小流,无以成江海。

简介

fork/join 框架是 ExecutorService 接口的一种实现,它专为可以递归分解为更小部分的工作而设计。目标是使用所有可用的处理能力来增强应用程序的性能。

与任何 ExecutorService 实现一样,fork/join 框架将任务分配给线程池中的工作线程。fork/join 框架与众不同,因为它使用了工作窃取算法。无事可做的工作线程可以从仍然忙碌的其他线程中窃取任务。

fork/join 框架的中心是 ForkJoinPool 类,是 AbstractExecutorService 类的扩展。ForkJoinPool 实现了工作偷取算法,并可以执行 ForkJoinTask 任务。

基本使用方法

首先需要编写执行一部分的工作代码,大致结构如下:

1
2
3
4
5
6
if (当前这个任务工作量足够小){
    直接完成这个任务
} else {
    将这个任务或这部分工作分解成两个部分
    分别触发(invoke)这两个子任务的执行,并等待结果  
}

示例

下面是官方文档中的一个示例,假设你想要模糊一张图片。原始的 source 图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与 source 图片相同,模糊之后的 destination 图片也由一个整数数组表示。 对图片的模糊操作是通过对 source 数组中的每一个像素点进行处理完成的。处理的过程是这样的:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝三个组成部分)放在一起取平均值,得到的结果被放入 destination 数组。因为一张图片会由一个很大的数组来表示,这个流程会花费一段较长的时间。如果使用 fork/join 框架来实现这个模糊算法,你就能够借助多处理器系统的并行处理能力。下面是上述算法结合 fork/join 框架的一种简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;

// Processing window size; should be odd.
private int mBlurWidth = 15;

public ForkBlur(int[] src, int start, int length, int[] dst) {
    mSource = src;
    mStart = start;
    mLength = length;
    mDestination = dst;
}

protected void computeDirectly() {
    int sidePixels = (mBlurWidth - 1) / 2;
    for (int index = mStart; index < mStart + mLength; index++) {
        // Calculate average.
        float rt = 0, gt = 0, bt = 0;
        for (int mi = -sidePixels; mi <= sidePixels; mi++) {
            int mindex = Math.min(Math.max(mi + index, 0),
                                mSource.length - 1);
            int pixel = mSource[mindex];
            rt += (float)((pixel & 0x00ff0000) >> 16)
                  / mBlurWidth;
            gt += (float)((pixel & 0x0000ff00) >>  8)
                  / mBlurWidth;
            bt += (float)((pixel & 0x000000ff) >>  0)
                  / mBlurWidth;
        }

        // Reassemble destination pixel.
        int dpixel = (0xff000000     ) |
               (((int)rt) << 16) |
               (((int)gt) <<  8) |
               (((int)bt) <<  0);
        mDestination[index] = dpixel;
    }
}

接下来你需要实现父类中的 compute() 方法,它会直接执行模糊处理,或者将当前的工作拆分成两个更小的任务。数组的长度可以作为一个简单的阀值来判断任务是应该直接完成还是应该被拆分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }

    int split = mLength / 2;

    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果前面这个方法是在一个 RecursiveAction 的子类中,那么设置任务在 ForkJoinPool 中执行就再直观不过了。通常会包含以下一些步骤:

  1. 创建一个表示所有需要完成工作的任务。
1
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  1. 创建将要用来执行任务的 ForkJoinPool。
1
2
orkJoinPool pool = new ForkJoinPool();

  1. 执行任务。
1
2
pool.invoke(fb);