Java并发编程——ForkJoin详解

2022-09-13 13:06:42

概念

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。类似于Java 8中的parallel Stream。

只能将任务1个切分为两个,不能切分为3个或其他数量

简单使用

publicclassForkJoinExample{//针对一个数字,做计算。privatestaticfinalInteger MAX=200;staticclassCalcForJoinTaskextendsRecursiveTask<Integer>{privateInteger startValue;//子任务的开始计算的值privateInteger endValue;//子任务结束计算的值publicCalcForJoinTask(Integer startValue,Integer endValue){this.startValue= startValue;this.endValue= endValue;}@OverrideprotectedIntegercompute(){//如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分if(endValue- startValue< MAX){System.out.println("开始计算:startValue:"+ startValue+" ; endValue:"+ endValue);Integer totalValue=0;for(int i=this.startValue; i<=this.endValue; i++){
                    totalValue+= i;}return totalValue;}//任务拆分,拆分成两个任务CalcForJoinTask subTask=newCalcForJoinTask(startValue,(startValue+ endValue)/2);
            subTask.fork();CalcForJoinTask calcForJoinTask=newCalcForJoinTask((startValue+ endValue)/2+1, endValue);
            calcForJoinTask.fork();return subTask.join()+ calcForJoinTask.join();}}publicstaticvoidmain(String[] args){CalcForJoinTask calcForJoinTask=newCalcForJoinTask(1,1000);// 这是Fork/Join框架的线程池ForkJoinPool pool=newForkJoinPool();ForkJoinTask<Integer> taskFuture= pool.submit(calcForJoinTask);try{Integer result= taskFuture.get();System.out.println("result:"+ result);}catch(InterruptedException e){
            e.printStackTrace();}catch(ExecutionException e){
            e.printStackTrace();}}}

运行结果:

开始计算:startValue:501; endValue:625
开始计算:startValue:251; endValue:375
开始计算:startValue:126; endValue:250
开始计算:startValue:376; endValue:500
开始计算:startValue:876; endValue:1000
开始计算:startValue:1; endValue:125
开始计算:startValue:626; endValue:750
开始计算:startValue:751; endValue:875
result:500500

工作流程图

为了更清晰的了解fork/join的原理,我们通过一个图形来理解。整体思想其实就是拆分与合并。

图中最顶层的任务使用submit方式被提交到Fork/Join框架中,Fork/Join把这个任务放入到某个线程中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范围足够小(小于等于200),就进行累加然后返回到上层任务中。

image-20211113181046263

代码分析

简单给大家解释一下Fork/Join的相关api,在刚刚的案例中,涉及到几个重要的API, ForkJoinTask ,ForkJoinPool .

ForkJoinTask

ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类如下:

  • RecursiveAction

    无结果返回的任务

  • RecursiveTask

    有返回结果的任务

  • CountedCompleter

    无返回值任务,完成任务后可以触发回调

ForkJoinTask提供了两个重要的方法:

  • fork

    让task异步执行

  • join

    让task同步执行,可以获取返回值

ForkJoinPool

ForkJoinPool : 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务)。

他是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。

工作线程一次只能执行一个任务。

ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )。

这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。

工作窃取( work-stealing )算法

要怎么解释 「 工作窃取算法 」 呢 ?

简单来说,就是空闲的线程试图从繁忙线程的 deques 中 窃取工作。

默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空时,工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了工作线程寻找任务的次数,因为它首先在最大可用的工作块上工作。

执行方法

方法名说明
invoke(ForkJoinTask)提交任务并一直阻塞直到任务执行完成返回合并结果。
execute(ForkJoinTask)异步执行任务,无返回值。
submit(ForkJoinTask)异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的结果。

ForkJoinTask 在不显示使用 ForkJoinPool.execute/invoke/submit() 方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。

源码分析

ForkJoinPool

ForkJoinPool forkJoinPool=newForkJoinPool();//Runtime.getRuntime().availableProcessors()当前操作系统可以使用的CPU内核数量publicForkJoinPool(){this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory,null,false);}//this调用到下面这段代码publicForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode){this(checkParallelism(parallelism),//并行度checkFactory(factory),//工作线程创建工厂
         handler,//异常处理handler
         asyncMode? FIFO_QUEUE: LIFO_QUEUE,//任务队列出队模式 异步:先进先出,同步:后进先出"ForkJoinPool-"+nextPoolId()+"-worker-");checkPermission();}//上面的this最终调用到下面这段代码privateForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix){this.workerNamePrefix= workerNamePrefix;this.factory= factory;this.ueh= handler;this.config=(parallelism& SMASK)| mode;long np=(long)(-parallelism);// offset ctl countsthis.ctl=((np<< AC_SHIFT)& AC_MASK)|((np<< TC_SHIFT)& TC_MASK);}
  • parallelism:可并行数量,fork/join框架将依据这个并行数量的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理;

  • factory:当fork/join创建一个新的线程时,同样会用到线程创建工厂。它实现了ForkJoinWorkerThreadFactory接口,使用默认的的接口实现类DefaultForkJoinWorkerThreadFactory来实现newThread方法创建一个新的工作线程;

  • handler:异常捕获处理器。当执行的任务出现异常,并从任务中被抛出时,就会被handler捕获;

  • asyncMode:fork/join为每一个独立的工作线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即可以使用先进先出的工作模式,也可以使用后进先出的工作模式;

image-20211113191234959

Submit

提交:ForkJoinPool.submit(ForkJoinTask task) -> externalPush(task) -> externalSubmit(task)

public<T>ForkJoinTask<T>submit(ForkJoinTask<T> task){if(task==null)thrownewNullPointerException();externalPush(task);return task;}//将任务添加到随机选取的队列中或新创建的队列中;finalvoidexternalPush(ForkJoinTask<?> task){WorkQueue[] ws;WorkQueue q;int m;int r=ThreadLocalRandom.getProbe();//当前线程的一个随机数int rs= runState;//当前容器的状态//如果随机选取的队列还有空位置可以存放、队列加锁锁定成功,任务就放入队列中if((ws= workQueues)!=null&&(m=(ws.length-1))>=0&&(q= ws[m& r& SQMASK])!=null&& r!=0&& rs>0&&U.compareAndSwapInt(q, QLOCK,0,1)){ForkJoinTask<?>[] a;int am, n, s;if((a= q.array)!=null&&(am= a.length-1)>(n=(s= q.top)- q.base)){int j=((am& s)<< ASHIFT)+ ABASE;U.putOrderedObject(a, j, task);//任务加入队列中U.putOrderedInt(q, QTOP, s+1);//挪动下次任务存放的槽的位置U.putIntVolatile(q, QLOCK,0);//队列解锁if(n<=1)//当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK,1,0);//队列解锁}externalSubmit(task);//升级版的externalPush}volatileint runState;// lockable status锁定状态// runState: SHUTDOWN为负数,其他的为2的次幂privatestaticfinalint  RSLOCK=1;privatestaticfinalint  RSIGNAL=1<<1;//唤醒privatestaticfinalint  STARTED=1<<2;//启动privatestaticfinalint  STOP=1<<29;//停止privatestaticfinalint  TERMINATED=1<<30;//结束privatestaticfinalint  SHUTDOWN=1<<31;//关闭//队列添加任务失败,进行升级版操作,即创建队列数组和创建队列后,将任务放入新创建的队列中;privatevoidexternalSubmit(ForkJoinTask<?> task){int r;// initialize caller's probeif((r=ThreadLocalRandom.getProbe())==0){ThreadLocalRandom.localInit();
        r=ThreadLocalRandom.getProbe();}for(;;){//自旋WorkQueue[] ws;WorkQueue q;int rs, m, k;boolean move=false;/**
        *ForkJoinPool执行器停止工作了,抛出异常
        *ForkJoinPool extends AbstractExecutorService
        *abstract class AbstractExecutorService implements ExecutorService
        *interface ExecutorService extends Executor
        *interface Executor执行提交的对象Runnable任务
        */if((rs= runState)<0){tryTerminate(false,false);// help terminatethrownewRejectedExecutionException();}//第一次遍历,队列数组未创建,进行创建elseif((rs& STARTED)==0||// initialize初始化((ws= workQueues)==null||(m= ws.length-1)<0)){int ns=0;
            rs=lockRunState();try{if((rs& STARTED)==0){U.compareAndSwapObject(this, STEALCOUNTER,null,newAtomicLong());// create workQueues array with size a power of twoint p= config& SMASK;// ensure at least 2 slots,config是CPU核数int n=(p>1)? p-1:1;
                    n|= n>>>1; n|= n>>>2;  n|= n>>>4;
                    n|= n>>>8; n|= n>>>16; n=(n+1)<<1;
                    workQueues=newWorkQueue[n];//创建
                    ns= STARTED;}}finally{unlockRunState(rs,(rs&~RSLOCK)| ns);}}//第三次遍历,把任务放入队列中elseif((q= ws[k= r& m& SQMASK])!=null){if(q.qlock==0&&U.compareAndSwapInt(q, QLOCK,0,1)){ForkJoinTask<?>[] a= q.array;int s= q.top;boolean submitted=false;// initial submission or resizingtry{// locked version of pushif((a!=null&& a.length> s+1- q.base)||(a= q.growArray())!=null){int j=(((a.length-1)& s)<< ASHIFT)+ ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s+1);
                        submitted=true;}}finally{U.compareAndSwapInt(q, QLOCK,1,0);}if(submitted){signalWork(ws, q);return;}}
            move=true;// move on failure}//第二次遍历,队列数组为空,创建队列elseif(((rs= runState)& RSLOCK)==0){// create new queue
            q=newWorkQueue(this,null);
            q.hint= r;
            q.config= k| SHARED_QUEUE;
            q.scanState= INACTIVE;
            rs=lockRunState();// publish indexif(rs>0&&(ws= workQueues)!=null&&
                k< ws.length&& ws[k]==null)
                ws[k]= q;// else terminatedunlockRunState(rs, rs&~RSLOCK);}else
            move=true;// move if busyif(move)
            r=ThreadLocalRandom.advanceProbe(r);}}

Fork

fork()用于将新创建的子任务放入当前线程的workQueue队列中,fork/join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkedThread线程运行它,又或者是唤起其他正在等待任务的ForkJoinWorkerThread线程运行它。

流程:ForkJoinTask.fork() -> ForkJoinWorkerThread.workQueue.push(task)/ForkJoinPool.common.externalPush(task) -> ForkJoinPool.push(task)/externalPush(task);

publicfinalForkJoinTask<V>fork(){Thread t;if((t=Thread.currentThread())instanceofForkJoinWorkerThread)//当前线程是workerThread,任务直接放入workerThread当前的workQueue((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);//将任务添加到随机选取的队列中或新创建的队列中returnthis;}publicclassForkJoinPoolextendsAbstractExecutorService{staticfinalclassWorkQueue{finalvoidpush(ForkJoinTask<?> task){ForkJoinTask<?>[] a;ForkJoinPool p;int b= base, s= top, n;if((a= array)!=null){// ignore if queue removed,队列被移除忽略int m= a.l
  • 作者:、楽.
  • 原文链接:https://blog.csdn.net/qq_41432730/article/details/121308729
    更新时间:2022-09-13 13:06:42