fork-join
框架允许在几个工作进程中断某个任务,然后等待结果组合它们。 它在很大程度上利用了多处理器机器的生产能力。 以下是fork-join
框架中使用的核心概念和对象。
Fork
Fork是一个进程,其中任务将其分成可以并发执行的较小且独立的子任务。
语法
Sum left = new Sum(array, low, mid);
left.fork();
这里Sum
是RecursiveTask
的子类,left.fork()
方法将任务分解为子任务。
Join
连接(Join
)是子任务完成执行后任务加入子任务的所有结果的过程,否则它会持续等待。
语法
left.join();
这里剩下的是Sum
类的一个对象。
ForkJoinPool
它是一个特殊的线程池,旨在使用fork-and-join
任务拆分。
语法
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这里有一个新的ForkJoinPool
,并行级别为4
个CPU。
RecursiveAction
RecursiveAction
表示不返回任何值的任务。
语法
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
递归任务
RecursiveTask
表示返回值的任务。
语法
class Sum extends RecursiveTask {
@Override
protected Long compute() { return null; }
}
实例
以下TestThread
程序显示了基于线程的环境中Fork-Join
框架的使用。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException, ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i=0; i< numbers.length; i++){
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i=low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
这将产生以下结果 -
4
499500