JDK线程池ThreadPoolExecutor源码总结

2022-08-25 12:28:38

1、线程池定义的几个状态

privatefinalAtomicInteger ctl=newAtomicInteger(ctlOf(RUNNING,0));//29privatestaticfinalint COUNT_BITS=Integer.SIZE-3;//容量 00011111111111111111111111111111privatestaticfinalint CAPACITY=(1<< COUNT_BITS)-1;// 运行状态存储在高三位 低29位表示当前线程池中的线程数//11100000000000000000000000000000privatestaticfinalint RUNNING=-1<< COUNT_BITS;//00000000000000000000000000000000privatestaticfinalint SHUTDOWN=0<< COUNT_BITS;//00100000000000000000000000000000privatestaticfinalint STOP=1<< COUNT_BITS;//0100000000000000000000000000000privatestaticfinalint TIDYING=2<< COUNT_BITS;//0110000000000000000000000000000privatestaticfinalint TERMINATED=3<< COUNT_BITS;// Packing and unpacking ctl获取线程池的状态rs(高三位)privatestaticintrunStateOf(int c){return c&~CAPACITY;}获取线程池中的线程数量(低29位)privatestaticintworkerCountOf(int c){return c& CAPACITY;}//计算变量ctl,使用两个数或操作初始化,高3位表示状态,低29位表示线程池中线程数量privatestaticintctlOf(int rs,int wc){return rs| wc;}

几个状态代表的具体意思:

RUNNING:运行状态,可以提交任务
SHUTDOWN:不接受新任务,但是会执行完阻塞队列中待执行的任务,一般是通过shutdown()产生
STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务,一般是通过shutdownNow()产生
TIDYING:线程池中所有的任务已经执行完,线程数量为0,开始调用terminated() 临时状态
TERMINATED:终止运行,已经执行完terminated()钩子方法
在这里插入图片描述
1.RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。
2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。
3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
4.STOP -> TIDYING:当线程池为空的时候。
5.TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。

2、线程池的成员变量

// 工作队列,核心线程数满了,任务会提交到这个队列privatefinalBlockingQueue<Runnable> workQueue;// 修改工作线程时要先获得锁防止并发,修改线程状态也要,用来控制新增 Worker 线程时候的原子性,这个锁是在添加线程时用到privatefinalReentrantLock mainLock=newReentrantLock();// 存放工作线程privatefinalHashSet<Worker> workers=newHashSet<Worker>();// termination 是mainLock锁对应的条件队列,在线程调用 awaitTermination 时候用来存放阻塞的线程privatefinalCondition termination= mainLock.newCondition();// 记录线程池生命周期内 线程数最大值privateint largestPoolSize;// 记录线程池所完成任务总数privatelong completedTaskCount;// 创建线程用的线程工厂privatevolatileThreadFactory threadFactory;// 拒绝策略privatevolatileRejectedExecutionHandler handler;// 空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出部分会被超时。// allowCoreThreadTimeOut == true 核心数量内的线程 空闲时 也会被回收,从工作队列取任务也用到这个参数privatevolatilelong keepAliveTime;// 控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。privatevolatileboolean allowCoreThreadTimeOut;// 核心线程数量privatevolatileint corePoolSize;// 线程池最大数量privatevolatileint maximumPoolSize;// 默认拒绝策略privatestaticfinalRejectedExecutionHandler defaultHandler=newAbortPolicy();privatestaticfinalRuntimePermission shutdownPerm=newRuntimePermission("modifyThread");privatefinalAccessControlContext acc;

3、内部类Worker工作线程

线程池中的工作线程以Worker作为体现,真正工作的线程为Worker的成员变量,Worker既是Runnable,又是同步器。Worker从工作队列中取出任务来执行,并能通过Worker控制任务状态
Worker 继承了 AQS,自己实现了简单不可重入独占锁,其中 status=0 标示锁未被获取状态,state=1 标示锁已经被获取的状态,state=-1 是创建 Worker 时候默认的状态,创建时候状态设置为 -1 是为了避免在该线程在运行 runWorker() 方法前被中断

// Worker采用了AQS的独占模式,Worker的锁是在runWorker执行任务时用到// 独占模式:两个重要属性  state  和  ExclusiveOwnerThread// state:0时表示未被占用 > 0时表示被占用   < 0 时 表示初始状态,这种情况下不能被抢锁privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */privatestaticfinallong serialVersionUID=6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */finalThread thread;/** 工作线程执行的第一个任务,假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。 */Runnable firstTask;/** 执行完多少个任务 */volatilelong completedTasks;/**
         * 设置 Worker 的状态为 -1,是为了避免当前 worker 在调用 runWorker 方法前被中断(当其它线程调用了线程池的 shutdownNow          时候,如果 worker 状态 >= 0 则会中断该线程)。
         * 这里设置了线程的状态为 -1,所以该线程就不会被中断了。如下代码运行 runWorker 的代码(9)时候会调用 unlock 方法,
         * 该方法把 status 变为了 0,所以这时候调用 shutdownNow 会中断 worker 线程了。
         */Worker(Runnable firstTask){// 设置AQS独占模式为初始化中的状态,这时候不能被抢占,在调用runWorker前禁止中断setState(-1);// inhibit interrupts until runWorkerthis.firstTask= firstTask;// 通过线程工厂new一个线程this.thread=getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */publicvoidrun(){runWorker(this);}// Lock methods//// 0解锁// 1加锁protectedbooleanisHeldExclusively(){returngetState()!=0;}// 重写aqs的加锁方法protectedbooleantryAcquire(int unused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}// 重写aqs的解锁方法protectedbooleantryRelease(int unused){setExclusiveOwnerThread(null);setState(0);returntrue;}// 自己定义的方法publicvoidlock(){acquire(1);}publicbooleantryLock(){returntryAcquire(1);}publicvoidunlock(){release(1);}publicbooleanisLocked(){returnisHeldExclusively();}// 中断线程voidinterruptIfStarted(){Thread t;if(getState()>=0&&(t= thread)!=null&&!t.isInterrupted()){try{
                    t.interrupt();}catch(SecurityException ignore){}}}}

4、线程池的核心方法

execute方法

publicvoidexecute(Runnable command){if(command==null)thrownewNullPointerException();/*
         * Proceed in 3 steps:
         *
         * 1. 直接new一个工作线程
         *
         * 2. 把任务提交到任务队列
         *
         * 3. 不能加到任务队列,直接new一个非核心线程
         */// 获取线程池状态int c= ctl.get();// 工作线程数小于核心线程就可以直接添加一个工作线程if(workerCountOf(c)< corePoolSize){// addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask// core==true 表示采用核心线程数量限制,false采用maxinumPoolSizeif(addWorker(command,true))return;//添加失败了,更新我们的ctl
            c= ctl.get();}// 核心线程满了或addWorker失败执行下面的把任务添加到队列if(isRunning(c)&& workQueue.offer(command)){/*
             * 为什么需要进行双重检查?
             * 因为在多线程下,ctl方法不是线程安全的,可能会出现获取了以后就改变了
             * 所以需要判断加完以后的状态,是不是在加的过程中发生了改变
             */int recheck= ctl.get();// 再次确认是否在运行// !isRunning()成功,代表当你提交到任务队列后,线程池状态被外部线程给修改,例如调用了shutDown(),shutDownNow()// remove成功,提交之后,线程池中的线程还没消费,把任务从队列移除// remove 失败,说明在shutDown或者shutDown之前,就被线程池的线程给处理了if(!isRunning(recheck)&&remove(command))reject(command);// 为0表示没有线程,就添加一个线程保证当前至少有一个线程存在elseif(workerCountOf(recheck)==0)addWorker(null,false);}// 工作队列满了并且线程数量达到maximumPoolSize或线程池不是running状态就reject任务elseif(!addWorker(command,false))reject(command);}// firstTask 可以为null,表示启动worker之后,worker自动到queue中获取任务.. 如果不是null,则worker优先执行firstTask// core 采用的线程数限制 如果为true 采用 核心线程数限制  false采用 maximumPoolSize线程数限制privatebooleanaddWorker(Runnable firstTask,boolean core){// 外层循环获取线程池状态 内层循环cas添加线程
        retry:for(;;){int c= ctl.get();// 运行状态int rs=runStateOf(c);// 1、当前线程池状态为Stop,Tidying,terminated这些状态// 2、线程池状态为shutdown并且有了第一个任务// 3、线程池状态为shutdown并且队列为空// 上面三种情况都不能添加工作线程if(rs>= SHUTDOWN&&!(rs== SHUTDOWN&&
                   firstTask==null&&! workQueue.isEmpty()))returnfalse;// 下面这个for()主要是做校验,如果校验通过就把线程池中的线程数量+1,注意,这里仅仅只是数量更新,但实际真正的线程还没有被创建for(;;){// 工作线程数量int wc=workerCountOf(c);// 当前工作线程数量不能超过容量if(wc>= CAPACITY||
                    wc>=(core? corePoolSize: maximumPoolSize))returnfalse;// cas添加工作线程数量,添加线程成功就退出循环,能够成功加1相当于申请到创建线程的令牌if(compareAndIncrementWorkerCount(c))break retry;
                c= ctl.get();// Re-read ctl// cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新casif(runStateOf(c)!= rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 走到这里表示cas修改ctl成功boolean workerStarted=false;boolean workerAdded=false;Worker w=null;try{// 这里开始new一个工作线程,会设置firstTask
            w=newWorker(firstTask);finalThread t= w.thread;if(t!=null){finalReentrantLock mainLock=this.mainLock;// 同一时刻操纵 线程池内部相关的操作,都必须持锁,为了保证workers同步,因为可能多个线程调用了线程池的execute方法
                mainLock.lock();try{//获取状态,避免在获取锁之前调用了shutdown方法int rs=runStateOf(ctl.get());if(rs< SHUTDOWN||(rs== SHUTDOWN&& firstTask==null)){// 线程还没关闭就抛异常if(t.isAlive())// precheck that t is startablethrownewIllegalThreadStateException();// 把工作线程添加到集合里
                        workers.add(w);int s= workers.size();// 更新最大线程数if(s> largestPoolSize)
                            largestPoolSize= s;
                        workerAdded=true;}}finally{
                    mainLock.unlock();}// 添加线程成功就启动线程并把启动状态设为trueif(workerAdded){
                    t.start();// 启动线程后会调用runWorker()方法
                    workerStarted=true;}}}finally{// 没有添加成功就回滚if(! workerStarted)addWorkerFailed(w);}return workerStarted;}

run方法

通过execute方法来启动线程后,就会通过work类中的run方法调用ThreadPoolExecutor的runWorker方法来运行任务。

publicvoidrun(){runWorker(this);}finalvoidrunWorker(Worker w){Thread wt=Thread.currentThread();Runnable task= w.firstTask;
        w.firstTask=null;// 强制释放锁// 将state设置成了0,这是可以进行中断了
        w.unlock();// allow interruptsboolean completedAbruptly=true;try{// 获取firstTask或从工作队列取任务,不为空就一直执行,在这里实现线程复用的,getTask()会一直阻塞,除非while(task!=null||(task=getTask())!=null){// 加锁,是因为当调用shutDown方法它会判断当前是否加锁,加锁就会跳过它接着执行下一个任务// 加锁其他线程调用了 shutdown 或者 shutdownNow 命令关闭了线程池
                w.lock();// 线程池停止// 线程如果被中断并且线程池停止了// 线程没有被中断也中断if((runStateAtLeast(ctl.get(), STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(), STOP)))&&!wt.isInterrupted())
                    wt.interrupt();try{// 钩子方法,方便子类在任务执行前做一些处理beforeExecute(wt, task);Throwable thrown=null;try{// 执行任务// task 可能是FutureTask 也可能是 普通的Runnable接口实现类。// 如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask
                        task.run();}catch(RuntimeException x){
                        thrown= x;throw x;}catch(Error x){
                        thrown= x;throw x;}catch(Throwable x){
                        thrown= x;thrownewError(x);}finally{afterExecute(task, thrown);}}finally{// 最终任务赋值为空,那么下次循环就直接从队列中拿任务了
                    task=null;
                    w.completedTasks++;
                    w.unlock();}}// 这里很重要,如果我们的任务中出现了异常,那么这段代码不会被执行。只有在用户自己的任务出现异常抛出导致的,手动关闭线程池不会受到影响
            completedAbruptly=false;}finally{// 来到这里表示要回收线程了processWorkerExit(w, completedAbruptly);}}

从工作队列获取任务

privateRunnablegetTask(){boolean timedOut=false;// Did the last poll() time out?for(;;){int c= ctl.get();int rs=runStateOf(c);// 线程池关闭并且工作队列为空就没任务取if(rs>= SHUTDOWN&&(rs>= STOP|| workQueue.isEmpty())){decrementWorkerCount();returnnull;}int wc=workerCountOf(c);// Are workers subject to culling?boolean timed= allowCoreThreadTimeOut|| wc> corePoolSize;// 线程数量比最大线程数大或线程过期并且工作队列是空if((wc> maximumPoolSize||(timed&& timedOut))&&(wc>1|| workQueue.isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{// poll操作有操作时间限制,take没有,默认情况,只有当工作线程数量大于核心线程数量时或allowCoreThreadTimeOut为true,才会调用poll方法触发超时调用Runnable r= timed?
                    workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):
                    workQueue.take();if(r!=null)
  • 作者:weixin_43478710
  • 原文链接:https://blog.csdn.net/weixin_43478710/article/details/126034487
    更新时间:2022-08-25 12:28:38