Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。
Fork:就是把一个大任务切分成若干小任务,并行执行;
Join:就是合并这些小任务的执行结果,最终得到大任务的结果。
Fork/Join 根据工作窃取算法进行设计
工作窃取算法:
(work-stealing),是指某个线程从其他队列里窃取任务来执行。
为什么需要工作窃取算法?
假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。
而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;
被窃取任务的线程永远从双端队列头部拿取任务执行;
窃取任务线程永远从双端队列尾部拿取任务执行;
Fork/Join 框架设计:
Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。
抽象类ForkJoinTask提供了两个子抽象类
RecursiveAction:没有返回结果的任务
RecursiveTask:有返回结果的任务
Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据
然后合并这些数据
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行
使用场景:
可以采用分而治之的算法场景
计算密集型的任务
示例:
计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。
package org.test.mem.thread.pool.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。
* <p>
* Fork:就是把一个大任务切分成若干小任务,并行执行;
* Join:就是合并这些小任务的执行结果,最终得到大任务的结果。
* <p>
* Fork/Join 根据工作窃取算法进行设计
* <p>
* 工作窃取算法:(work-stealing),是指某个线程从其他队列里窃取任务来执行。
*
* 为什么需要工作窃取算法?
* 假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。
* 而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;
* 被窃取任务的线程永远从双端队列头部拿取任务执行;
* 窃取任务线程永远从双端队列尾部拿取任务执行;
*
* Fork/Join 框架设计:
* Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。
* 抽象类ForkJoinTask提供了两个子抽象类
* RecursiveAction:没有返回结果的任务
* RecursiveTask:有返回结果的任务
*
* Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据
* 然后合并这些数据
*
* ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行
*
*
* 使用场景:
* 可以采用分而治之的算法场景
* 计算密集型的任务
*
* 实例:
* 计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。
*
* */
public class ForkJoinTest extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10;
private int start;
private int end;
public ForkJoinTest(int start, int end) {
this.start = start;
this.end = end;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//计算任务
ForkJoinTest task = new ForkJoinTest(1, 100);
//异步执行任务
ForkJoinTask<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < THRESHOLD) {
for (int i = start; i <= end; i++) {
sum += i;
// System.out.println(Thread.currentThread().getName() + ":" + i + " sum:" + sum);
}
System.out.println(Thread.currentThread().getName() +" sum:" +sum +" start:"+start+" end:"+end);
return sum;
}else{
int middle = (end + start) / 2;
ForkJoinTest leftForkJoin = new ForkJoinTest(start, middle);
ForkJoinTest rightForkJoin = new ForkJoinTest(middle + 1, end);
leftForkJoin.fork();
rightForkJoin.fork();
int leftResult = leftForkJoin.join();
int rightResult = rightForkJoin.join();
sum = leftResult + rightResult;
return sum;
}
}
}