博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
我的Java开发学习之旅------>Java使用Fork/Join框架来并行执行任务
阅读量:7144 次
发布时间:2019-06-29

本文共 10625 字,大约阅读时间需要 35 分钟。

现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。

虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

如下面的示意图所示:

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  •  public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  •  public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool

创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任务
  • RecursiveAction代表没有返回值的任务。

一、RecursiveAction

下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。

大任务是:打印0-200的数值。

小任务是:每次只能打印50个数值。

import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;import java.util.concurrent.TimeUnit;//RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务class PrintTask extends RecursiveAction {	// 每个"小任务"最多只打印50个数	private static final int MAX = 50;	private int start;	private int end;	PrintTask(int start, int end) {		this.start = start;		this.end = end;	}	@Override	protected void compute() {		// 当end-start的值小于MAX时候,开始打印		if ((end - start) < MAX) {			for (int i = start; i < end; i++) {				System.out.println(Thread.currentThread().getName() + "的i值:"						+ i);			}		} else {			// 将大任务分解成两个小任务			int middle = (start + end) / 2;			PrintTask left = new PrintTask(start, middle);			PrintTask right = new PrintTask(middle, end);			// 并行执行两个小任务			left.fork();			right.fork();		}	}}public class ForkJoinPoolTest {	/**	 * @param args	 * @throws Exception	 */	public static void main(String[] args) throws Exception {		// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool		ForkJoinPool forkJoinPool = new ForkJoinPool();		// 提交可分解的PrintTask任务		forkJoinPool.submit(new PrintTask(0, 200));		forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束		// 关闭线程池		forkJoinPool.shutdown();	}}

运行结果如下:

ForkJoinPool-1-worker-2的i值:75ForkJoinPool-1-worker-2的i值:76ForkJoinPool-1-worker-2的i值:77ForkJoinPool-1-worker-2的i值:78ForkJoinPool-1-worker-2的i值:79ForkJoinPool-1-worker-2的i值:80ForkJoinPool-1-worker-2的i值:81ForkJoinPool-1-worker-2的i值:82ForkJoinPool-1-worker-2的i值:83ForkJoinPool-1-worker-2的i值:84ForkJoinPool-1-worker-2的i值:85ForkJoinPool-1-worker-2的i值:86ForkJoinPool-1-worker-2的i值:87ForkJoinPool-1-worker-2的i值:88ForkJoinPool-1-worker-2的i值:89ForkJoinPool-1-worker-2的i值:90ForkJoinPool-1-worker-2的i值:91ForkJoinPool-1-worker-2的i值:92ForkJoinPool-1-worker-2的i值:93ForkJoinPool-1-worker-2的i值:94ForkJoinPool-1-worker-2的i值:95ForkJoinPool-1-worker-2的i值:96ForkJoinPool-1-worker-2的i值:97ForkJoinPool-1-worker-2的i值:98ForkJoinPool-1-worker-2的i值:99ForkJoinPool-1-worker-2的i值:50ForkJoinPool-1-worker-2的i值:51ForkJoinPool-1-worker-2的i值:52ForkJoinPool-1-worker-2的i值:53ForkJoinPool-1-worker-2的i值:54ForkJoinPool-1-worker-2的i值:55ForkJoinPool-1-worker-2的i值:56ForkJoinPool-1-worker-2的i值:57ForkJoinPool-1-worker-2的i值:58ForkJoinPool-1-worker-2的i值:59ForkJoinPool-1-worker-2的i值:60ForkJoinPool-1-worker-2的i值:61ForkJoinPool-1-worker-2的i值:62ForkJoinPool-1-worker-2的i值:63ForkJoinPool-1-worker-2的i值:64ForkJoinPool-1-worker-2的i值:65ForkJoinPool-1-worker-2的i值:66ForkJoinPool-1-worker-2的i值:67ForkJoinPool-1-worker-2的i值:68ForkJoinPool-1-worker-2的i值:69ForkJoinPool-1-worker-1的i值:175ForkJoinPool-1-worker-1的i值:176ForkJoinPool-1-worker-1的i值:177ForkJoinPool-1-worker-1的i值:178ForkJoinPool-1-worker-1的i值:179ForkJoinPool-1-worker-1的i值:180ForkJoinPool-1-worker-1的i值:181ForkJoinPool-1-worker-1的i值:182ForkJoinPool-1-worker-1的i值:183ForkJoinPool-1-worker-1的i值:184ForkJoinPool-1-worker-1的i值:185ForkJoinPool-1-worker-1的i值:186ForkJoinPool-1-worker-1的i值:187ForkJoinPool-1-worker-1的i值:188ForkJoinPool-1-worker-1的i值:189ForkJoinPool-1-worker-1的i值:190ForkJoinPool-1-worker-1的i值:191ForkJoinPool-1-worker-1的i值:192ForkJoinPool-1-worker-1的i值:193ForkJoinPool-1-worker-1的i值:194ForkJoinPool-1-worker-1的i值:195ForkJoinPool-1-worker-1的i值:196ForkJoinPool-1-worker-1的i值:197ForkJoinPool-1-worker-1的i值:198ForkJoinPool-1-worker-1的i值:199ForkJoinPool-1-worker-1的i值:150ForkJoinPool-1-worker-1的i值:151ForkJoinPool-1-worker-1的i值:152ForkJoinPool-1-worker-1的i值:153ForkJoinPool-1-worker-1的i值:154ForkJoinPool-1-worker-1的i值:155ForkJoinPool-1-worker-1的i值:156ForkJoinPool-1-worker-1的i值:157ForkJoinPool-1-worker-1的i值:158ForkJoinPool-1-worker-1的i值:159ForkJoinPool-1-worker-1的i值:160ForkJoinPool-1-worker-1的i值:161ForkJoinPool-1-worker-1的i值:162ForkJoinPool-1-worker-1的i值:163ForkJoinPool-1-worker-1的i值:164ForkJoinPool-1-worker-1的i值:165ForkJoinPool-1-worker-1的i值:166ForkJoinPool-1-worker-1的i值:167ForkJoinPool-1-worker-1的i值:168ForkJoinPool-1-worker-1的i值:169ForkJoinPool-1-worker-1的i值:170ForkJoinPool-1-worker-1的i值:171ForkJoinPool-1-worker-1的i值:172ForkJoinPool-1-worker-1的i值:173ForkJoinPool-1-worker-1的i值:174ForkJoinPool-1-worker-1的i值:125ForkJoinPool-1-worker-1的i值:126ForkJoinPool-1-worker-1的i值:127ForkJoinPool-1-worker-1的i值:128ForkJoinPool-1-worker-1的i值:129ForkJoinPool-1-worker-1的i值:130ForkJoinPool-1-worker-1的i值:131ForkJoinPool-1-worker-1的i值:132ForkJoinPool-1-worker-1的i值:133ForkJoinPool-1-worker-1的i值:134ForkJoinPool-1-worker-1的i值:135ForkJoinPool-1-worker-1的i值:136ForkJoinPool-1-worker-1的i值:137ForkJoinPool-1-worker-1的i值:138ForkJoinPool-1-worker-1的i值:139ForkJoinPool-1-worker-1的i值:140ForkJoinPool-1-worker-1的i值:141ForkJoinPool-1-worker-1的i值:142ForkJoinPool-1-worker-1的i值:143ForkJoinPool-1-worker-1的i值:144ForkJoinPool-1-worker-1的i值:145ForkJoinPool-1-worker-1的i值:146ForkJoinPool-1-worker-1的i值:147ForkJoinPool-1-worker-1的i值:148ForkJoinPool-1-worker-1的i值:149ForkJoinPool-1-worker-1的i值:100ForkJoinPool-1-worker-1的i值:101ForkJoinPool-1-worker-1的i值:102ForkJoinPool-1-worker-1的i值:103ForkJoinPool-1-worker-1的i值:104ForkJoinPool-1-worker-1的i值:105ForkJoinPool-1-worker-1的i值:106ForkJoinPool-1-worker-1的i值:107ForkJoinPool-1-worker-1的i值:108ForkJoinPool-1-worker-1的i值:109ForkJoinPool-1-worker-1的i值:110ForkJoinPool-1-worker-1的i值:111ForkJoinPool-1-worker-1的i值:112ForkJoinPool-1-worker-1的i值:113ForkJoinPool-1-worker-1的i值:114ForkJoinPool-1-worker-1的i值:115ForkJoinPool-1-worker-1的i值:116ForkJoinPool-1-worker-1的i值:117ForkJoinPool-1-worker-1的i值:118ForkJoinPool-1-worker-1的i值:119ForkJoinPool-1-worker-1的i值:120ForkJoinPool-1-worker-1的i值:121ForkJoinPool-1-worker-1的i值:122ForkJoinPool-1-worker-1的i值:123ForkJoinPool-1-worker-1的i值:124ForkJoinPool-1-worker-1的i值:25ForkJoinPool-1-worker-1的i值:26ForkJoinPool-1-worker-1的i值:27ForkJoinPool-1-worker-1的i值:28ForkJoinPool-1-worker-1的i值:29ForkJoinPool-1-worker-1的i值:30ForkJoinPool-1-worker-1的i值:31ForkJoinPool-1-worker-1的i值:32ForkJoinPool-1-worker-1的i值:33ForkJoinPool-1-worker-1的i值:34ForkJoinPool-1-worker-1的i值:35ForkJoinPool-1-worker-1的i值:36ForkJoinPool-1-worker-1的i值:37ForkJoinPool-1-worker-1的i值:38ForkJoinPool-1-worker-1的i值:39ForkJoinPool-1-worker-1的i值:40ForkJoinPool-1-worker-1的i值:41ForkJoinPool-1-worker-1的i值:42ForkJoinPool-1-worker-1的i值:43ForkJoinPool-1-worker-1的i值:44ForkJoinPool-1-worker-1的i值:45ForkJoinPool-1-worker-1的i值:46ForkJoinPool-1-worker-1的i值:47ForkJoinPool-1-worker-1的i值:48ForkJoinPool-1-worker-1的i值:49ForkJoinPool-1-worker-1的i值:0ForkJoinPool-1-worker-1的i值:1ForkJoinPool-1-worker-1的i值:2ForkJoinPool-1-worker-1的i值:3ForkJoinPool-1-worker-1的i值:4ForkJoinPool-1-worker-1的i值:5ForkJoinPool-1-worker-1的i值:6ForkJoinPool-1-worker-1的i值:7ForkJoinPool-1-worker-1的i值:8ForkJoinPool-1-worker-1的i值:9ForkJoinPool-1-worker-1的i值:10ForkJoinPool-1-worker-1的i值:11ForkJoinPool-1-worker-1的i值:12ForkJoinPool-1-worker-1的i值:13ForkJoinPool-1-worker-1的i值:14ForkJoinPool-1-worker-1的i值:15ForkJoinPool-1-worker-1的i值:16ForkJoinPool-1-worker-1的i值:17ForkJoinPool-1-worker-1的i值:18ForkJoinPool-1-worker-1的i值:19ForkJoinPool-1-worker-1的i值:20ForkJoinPool-1-worker-1的i值:21ForkJoinPool-1-worker-1的i值:22ForkJoinPool-1-worker-1的i值:23ForkJoinPool-1-worker-1的i值:24ForkJoinPool-1-worker-2的i值:70ForkJoinPool-1-worker-2的i值:71ForkJoinPool-1-worker-2的i值:72ForkJoinPool-1-worker-2的i值:73ForkJoinPool-1-worker-2的i值:74

从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。

二、RecursiveTask

下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。

大任务是:计算随机的100个数字的和。

小任务是:每次只能20个数值的和。

import java.util.Random;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import java.util.concurrent.RecursiveTask;//RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务class SumTask extends RecursiveTask
{ // 每个"小任务"最多只打印50个数 private static final int MAX = 20; private int arr[]; private int start; private int end; SumTask(int arr[], int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 当end-start的值小于MAX时候,开始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { sum += arr[i]; } return sum; } else { System.err.println("=====任务分解======"); // 将大任务分解成两个小任务 int middle = (start + end) / 2; SumTask left = new SumTask(arr, start, middle); SumTask right = new SumTask(arr, middle, end); // 并行执行两个小任务 left.fork(); right.fork(); // 把两个小任务累加的结果合并起来 return left.join() + right.join(); } }}public class ForkJoinPoolTest2 { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int arr[] = new int[100]; Random random = new Random(); int total = 0; // 初始化100个数字元素 for (int i = 0; i < arr.length; i++) { int temp = random.nextInt(100); // 对数组元素赋值,并将数组元素的值添加到total总和中 total += (arr[i] = temp); } System.out.println("初始化时的总和=" + total); // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 Future
future = forkJoinPool.submit(new SumTask(arr, 0, arr.length)); System.out.println("计算出来的总和=" + future.get()); // 关闭线程池 forkJoinPool.shutdown(); }}
计算结果如下:
初始化时的总和=4283=====任务分解===========任务分解===========任务分解===========任务分解===========任务分解===========任务分解===========任务分解======计算出来的总和=4283

从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。

读者还参考以下文章加深对ForkJoinPool的理解

==================================================================================================

  作者:欧阳鹏  欢迎转载,与人分享是进步的源泉!

  转载请保留原文地址

==================================================================================================

你可能感兴趣的文章
【对讲机的那点事】出租车司机开车时使用对讲机 大家怎么看?
查看>>
【对讲机的那点事】如何设置灵通LD7000H数字对讲机的信道?
查看>>
html 三列布局(两列自适应,一列固定宽度)
查看>>
详解javascript立即执行函数表达式(IIFE)
查看>>
WPF画图の利用Path画扇形(仅图形)
查看>>
(二)spring cloud微服务分布式云架构 - 整合企业架构的技术点
查看>>
Windows开发环境搭建
查看>>
asp.net core mvc 管道之中间件
查看>>
Win10任务栏假死问题解决方案
查看>>
[UWP]为附加属性和依赖属性自定义代码段(兼容UWP和WPF)
查看>>
mysql到JSP之间数据格式转换
查看>>
Thrift原理分析(一) 基本概念
查看>>
老司机避坑指南:如何快速搞定微服务架构?
查看>>
杨老师课堂之JavaScript案例全选、全不选、及反选
查看>>
开源编辑器 Atom 简化代码审查过程
查看>>
等等!这两个mysql慢查询的坑我已经替你们踩了
查看>>
【Python标准库:fileinput】优雅的读取文件
查看>>
“NO GENDER.NO BORDER.”,无性别服饰品牌“Bosie”获近千万元Pre-A轮融资
查看>>
用Python统计你的简书数据
查看>>
全票通过,百度 Doris 项目进入 Apache 基金会孵化器
查看>>