深入浅出Java线程池:源码篇

2023年2月10日11:55:20

前言

在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用。(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适)。本文则深入线程池的源码,主要是介绍ThreadPoolExecutor内部的源码是如何实现的,对ThreadPoolExecutor有一个更加清晰的认识。

ThreadPoolExecutor的源码相对而言比较好理解,没有特别难以读懂的地方。相信没有阅读源码习惯的读者,跟着本文,也可以很轻松地读懂ThreadPoolExecutor的核心源码逻辑。

本文源码jdk版本为8,该类版本为jdk1.5,也就是在1.5之后,ThreadPoolExecutor的源码没有做修改。

线程池家族

Java中的线程池继承结构如下图:(类图中只写了部分方法且省略参数)

深入浅出Java线程池:源码篇

  • 顶层接口Executor表示一个执行器,他只有一个接口:execute() ,表示可以执行任务
  • ExecutorService在Executor的基础上拓展了更多的执行方法,如submit() shutdown() 等等,表示一个任务执行服务。
  • AbstarctExecutorService是一个抽象类,他实现了ExecutorService的部分核心方法,如submit等
  • ThreadPoolExecutor是最核心的类,也就是线程池,他继承了抽象类AbstarctExecutorService
  • 此外还有ScheduledExecutorService接口,他表示一个可以按照指定时间或周期执行的执行器服务,内部定义了如schedule() 等方法来执行任务
  • ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,同时继承于ThreadPoolExecutor,内部的线程池相关逻辑使用自ThreadPoolExecutor,在此基础上拓展了延迟、周期执行等功能特性

ScheduledThreadPoolExecutor相对来说用的是比较少。延时任务在我们Android中有更加熟悉的方案:Handler;而周期任务则用的非常少。现在android的后台限制非常严格,基本上一退出应用,应用进程很容易被系统干掉。当然ScheduledThreadPoolExecutor也不是完全没有用处,例如桌面小部件需要设置定时刷新,那么他就可以派上用场了。

因此,我们本文的源码,主要针对ThreadPoolExecutor。在阅读源码之前,我们先来看一下ThreadPoolExecutor内部的结构以及关键角色。

内部结构

阅读源码前,我们先把ThreadPoolExecutor整个源码结构讲解一下,形成一个整体概念,再阅读源码就不会迷失在源码中了。先来看一下ThreadPoolExecutor的内部结构:

深入浅出Java线程池:源码篇

  • ThreadPoolExecutor内部有三个关键的角色:阻塞队列、线程、以及RejectExecutionHandler(这里写个中文名纯粹因为不知道怎么翻译这个名字),他们的作用在理论篇有详细介绍,这里不再赘述。
  • 在ThreadPoolExecutor中,一个线程对应一个worker对象,工人,非常形象。每个worker内部有一个独立的线程,他会不断去阻塞队列获取任务来执行,也就是调用阻塞队列的 poll 或者 take 方法,他们区别后面会讲。如果队列没有任务了,那么就会阻塞在这里。
  • workQueue,就是阻塞队列,当核心线程已满之后,任务就会被放置在这里等待被工人worker领取执行
  • RejectExecutionHandler本身是一个接口,ThreadPoolExecutor内部有这样的一个接口对象,当任务无法被执行会调用这个对象的方法。ThreadPoolExecutor提供了该接口的4种实现方案,我们可以直接拿来用,或者自己继承接口,实现自定义逻辑。在构造线程池的时候可以传入RejectExecutionHandler对象。
  • 整个ThreadPoolExecutor中最核心的方法就是execute,他会根据具体的情况来选择不同的执行方案或者拒绝执行。

这样,我们就清楚ThreadPoolExecutor的内部结构了,然后,我们开始 Read the fucking code 吧。

源码分析

内部关键属性

ThreadPoolExecutor内部有很多的变量,他们包含的信息非常重要,先来了解一下。

ThreadPoolExecutor的状态和线程数整合在同一个int变量中,类似于view测量中MeasureSpec。他的高三位表示线程池的状态,低29位表示线程池中线程的数量,如下:

// AtomicInteger对象可以利用CAS实现线程安全的修改,其中包含了线程池状态和线程数量信息
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29,(对于int长度为32来说)表示线程数量的字节位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 状态掩码,高三位是1,低29位全是0,可以通过 ctl&COUNT_MASK 运算来获取线程池状态
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

线程池的状态一共有5个:

  • 运行running:线程池创建之后即是运行状态
  • 关闭shutdown:调用shutdown方法之后线程池处于shutdown状态,该状态会停止接收任何任务,阻塞队列中的任务执行完成之后会自动终止线程池
  • 停止stop:调用shutdownNow方法之后线程池处于stop状态。和shutdown的区别是这个状态下的线程池不会去执行队列中剩下的任务
  • 整理tidying:在线程池stop之后,进入tidying状态,然后执行 terminated() 方法,再进入terminated状态
  • 终止terminated:线程池中没有任何线程在执行任务,线程池完全终止。

在源码中这几个状态分别对应:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

上面的位操作不够直观,转化后如下:

private static final int RUNNING    = 111 00000 00000000 00000000 00000000;
private static final int SHUTDOWN   = 000 00000 00000000 00000000 00000000; 
private static final int STOP       = 001 00000 00000000 00000000 00000000;
private static final int TIDYING    = 010 00000 00000000 00000000 00000000;
private static final int TERMINATED = 011 00000 00000000 00000000 00000000;

可以看到除了running是负数,其他的状态都是正数,且状态越靠后,数值越大。因此我们可以通过判断 ctl&COUNT_MASK > SHUTDOWN 来判断状态是否处于 stop、tidying、terminated之一。后续源码中会有很多的这样的判断,举其中的一个方法:

// 这里来判断线程池的状态
if(runStateAtLeast(ctl,SHUTDOWN)) {
    ...
}
// 这里执行逻辑,直接判断两个数的大小
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

ps:这里怎么没有使用掩码COUNT_MASK ?因为状态是处于高位,低位的数值不影响高位的大小判断。当然如果要判断相等,就还是需要使用掩码COUNT_MASK的。

接下来是ThreadPoolExecutor内部的三个关键角色对象:

// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 存储worker的hashSet,worker被创建之后会被存储到这里
private final HashSet<Worker> workers = new HashSet<>();
// RejectedExecutionHandler默认的实现是AbortPolicy
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

内部使用的锁对象:

// 这里是两个锁。ThreadPoolExecutor内部并没有使用Synchronize关键字来保持同步
// 而是使用Lock;和Synchronize的区别就是他是应用层的锁,而synchronize是jvm层的锁
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

最后是内部一些参数的配置,前面都介绍过,把源码贴出来再回顾一下:

// 线程池历史达到的最大线程数
private int largestPoolSize;
// 线程池完成的任务数。
// 该数并不是实时更新的,在获取线程池完成的任务数时,需要去统计每个worker完成的任务并累加起来
// 当一个worker被销毁之后,他的任务数就会被累加到这个数据中
private long completedTaskCount;
// 线程工厂,用于创建线程
private volatile ThreadFactory threadFactory;
// 空闲线程存储的时间
private volatile long keepAliveTime;
// 是否允许核心线程被回收
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数限额
private volatile int corePoolSize;
// 线程总数限额
private volatile int maximumPoolSize;

不是吧sir?源码还没看到魂呢,整出来这么无聊的变量?
咳咳,别急嘛,源码解析马上来。这些变量会贯穿整个源码过程始终,先对他们有个印象,后续阅读源码就会轻松畅通很多。

关键方法:execute()

这个方法的主要任务就是根据线程池的当前状态,选择任务的执行策略。该方法的核心逻辑思路是:

  1. 在线程数没有达到核心线程数时,会创建一个核心线程来执行任务

    public void execute(Runnable command) {
        // 不能传入空任务
        if (command == null)
            throw new NullPointerException();
    
        // 获取ctl变量,就是上面我们讲的将状态和线程数合在一起的一个变量
        int c = ctl.get();
        // 判断核心线程数是否超过限额,否则创建一个核心线程来执行任务
        if (workerCountOf(c) < corePoolSize) {
            // addWorker方法是创建一个worker,也就是创建一个线程,参数true表示这是一个核心线程
            // 如果添加成功则直接返回
            // 否则意味着中间有其他的worker被添加了,导致超出核心线程数;或者线程池被关闭了等其他情况
            // 需要进入下一步继续判断
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        ...
    }
    
  2. 当线程数达到核心线程数时,新任务会被放入到等待队列中等待被执行

  3. 当等待队列已经满了之后,如果线程数没有到达总的线程数上限,那么会创建一个非核心线程来执行任务

  4. 当线程数已经到达总的线程数限制时,新的任务会被拒绝策略者处理,线程池无法执行该任务。

    public void execute(Runnable command) {
        ...
        // 如果线程池还在运行,则尝试添加任务到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检查如果线程池被关闭了,那么把任务移出队列
            // 如果移除成功则拒绝本次任务
            // 这里主要是判断在插入队列的过程中,线程池有没有被关闭了
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 否则再次检查线程数是否为0,如果是,则创建一个没有任务的非主线程worker
            // 这里对应核心线程为0的情况,指定任务为null,worker会去队列拿任务来执行
            // 这里表示线程池至少有一个线程来执行队列中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果上面添加到队列中失败,则尝试创建一个非核心线程来执行任务
        // 如果创建失败,则拒绝任务
        else if (!addWorker(command, false))
            reject(command);
    }
    

源码中还设计到两个关键方法:addWorker创建一个新的worker,也就是创建一个线程;reject拒绝一个任务。后者比较简单我们先看一下。

拒绝任务:reject()

// 拒绝任务,调用rejectedExecutionHandler来处理
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

默认的实现类有4个,我们依次来看一下:

  • AbortPolicy是默认实现,会抛出一个RejectedExecutionException异常:

    public static class AbortPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
  • DiscardPolicy最简单,就是:什么都不做,直接抛弃任务。(这是非常渣男不负责任的行为,咱们不能学他,所以也不要用它 [此处狗头] )

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
  • DiscardOldestPolicy会删除队列头的一个任务,然后再次执行自己(挤掉原位,自己上位,绿茶行为?)

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
  • CallerRunsPolicy最猛,他干脆在自己的线程执行run方法,不依靠线程池了,自己动手丰衣足食。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    

上面4个ThreadPoolExecutor已经帮我们实现了,他的静态内部类,在创建ThreadPoolExecutor的时候我们可以直接拿来用。也可以自己继承接口实现自己的逻辑。具体选择哪个需要根据实际的业务需求来决定。

那么接下来看创建worker的方法。

创建worker:addWorker()

方法的目的很简单:创建一个worker。前面我们讲到,worker内部创建了一个线程,每一个worker则代表了一个线程,非常类似android中的looper。looper的loop()方法会不断地去MessageQueue获取message,而Worker的run()方法会不断地去阻塞队列获取任务,这个我们后面讲。

addWorker() 方法的逻辑整体上分为两个部分:

  1. 检查线程状态线程数是否满足条件:

    // 第一个参数是创建的线程首次要执行的任务,可以是null,则表示初始化一个线程
    // 第二参数表示是否是一个核心线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 还记不记得我们前面讲到线程池的状态控制?
            // runStateAtLeast(c, SHUTDOWN)表示状态至少为shutdown,后面类同
            // 如果线程池处于stop及以上,不会再创建worker
            // 如果线程池状态在shutdown时,如果队列不为空或者任务!=null,则还会创建worker
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                // 其他情况返回false,表示拒绝创建worker
                return false;
            
    		// 这里采用CAS轮询,也就是循环锁的策略来让线程总数+1
            for (;;) {
                // 检查是否超出线程数限制
                // 这里根据core参数判断是核心线程还是非核心线程
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 利用CAS让ctl变量自增,表示worker+1
                // 如果CAS失败,则表示发生了竞争,则再来一次
                if (compareAndIncrementWorkerCount(c))
                    // 成功则跳出最外层循环
                    break retry;
                // 如果这个期间ctl被改变了,则获取ctl,再尝试一次
                c = ctl.get();  
                // 如果线程池被shutdown了,那么重复最外层的循环,重新判断状态是否可以创建worker
                if (runStateAtLeast(c, SHUTDOWN))
                    // 继续最外层循环
                    continue retry;
            }
        }
        
        // 创建worker逻辑
        ...
    }
    

    不知道读者对于源码中的retry: 有没有疑惑,毕竟平时很少用到。他的作用是标记一个循环,这样我们在内层的循环就可以跳转到任意一个外层的循环。这里的retry只是一个名字,改成 repeat: 甚至 a: 都是可以的。他的本质就是:一个循环的标记

  2. 创建worker对象,并调用其内部线程的start()方法来启动线程:

    private boolean addWorker(Runnable firstTask, boolean core) {
    	...
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建一个新的worker
            // 创建的过程中内部会创建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获得全局锁并加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 获取锁之后,需要再次检查状态
                    int c = ctl.get();
    				// 只有运行状态或者shutDown&&task==null才会被执行
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 如果这个线程不是刚创建的,则抛出异常
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException(); 
                        // 添加到workerSet中
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        // 跟踪线程池到达的最多线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 如果添加成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程没有启动,表示添加worker失败,可能在添加的过程中线程池被关闭了
            if (! workerStarted)
                // 把worker从workerSet中移除
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

经过前面两步,如果没有出现异常,则创建worker成功。最后还涉及到一个方法: addWorkerFailed(w) ,他的内容比较简答,顺便提一下吧:

// 添加worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        // 这里会让线程总数-1
        decrementWorkerCount();
        // 尝试设置线程池的状态为terminad
        // 因为添加失败有可能是线程池在添加worker的过程中被shutdown
        // 那么这个时候如果没有任务正在执行就需要设置状态为terminad
        // 这个方法后面会详细讲
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

那么到这里,execute()方法中的一些调用方法就分析完了。阻塞队列相关的方法不属于本文的范畴,就不展开了。那么还有一个问题:worker是如何工作的呢?worker内部有一个线程,当线程启动时,初始化线程的runnable对象的run方法会被调用,那么这个runnable对象是什么?我直接来看worker。

打工人:Worker

首先我们看到他的构造方法:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

源码很简单,把传进来的任务设置给内部变量firstTask,然后把自己传给线程工厂去创建一个线程。所以线程启动时,Worker本身的run方法会被调用,那么我们看到Worker的 run()方法。

public void run() {
    runWorker(this);
}

Worker是ThreadPoolExecutor的内部类,这里直接调用到了ThreadPoolExecutor的方法: runWorker()来开始执行。那么接下来,我们就看到这个方法。

启动worker:runWorker()

这个方法是worker执行的方法,在线程被销毁前他会一直执行,类似于Handler的looper,不断去队列获取消息来执行:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取worker初始化时设置的任务,可以为null。如果为null则表示仅仅创建线程
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    // 这个参数的作用后面解释,需要结合其他的源码
    boolean completedAbruptly = true;
    try {
        // 如果自身的task不为null,那么会执行自身的task
        // 否则调用getTask去队列获取一个task来执行
        // 这个getTask最终会去调用队列的方法来获取任务
        // 而队列如果为空他的获取方法会进行阻塞,这里也就阻塞了,后面深入讲
        while (task != null || (task = getTask()) != null) {
            try{
            // 执行任务
            ...
            } finally {
                // 任务执行完成,把task设置为null
                task = null;
                // 任务总数+1
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
    	// 这里设置为false,先记住他
        completedAbruptly = false;
    } finally {
    	// 如果worker退出,那么需要执行后续的善后工作
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到这个方法的整体框架还是比较简单的,核心就在于 while (task != null || (task = getTask()) != null) 这个循环中,如果 getTask() 返回null,则表示线程该结束了,这和Handler机制也是一样的。

上面的源码省略了具体执行任务的逻辑,他的逻辑也是很简单:判断状态+运行任务。我们来看一下:

final void runWorker(Worker w) {
    ...;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池已经设置为stop状态,那么保证线程是interrupted标志
            // 如果线程池没有在stop状态,那么保证线程不是interrupted标志
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 回调方法,这个方法是一个空实现
                beforeExecute(wt, task);
                try {
                    // 运行任务
                    task.
  • 作者:weixin_43766753
  • 原文链接:https://blog.csdn.net/weixin_43766753/article/details/113706661
    更新时间:2023年2月10日11:55:20 ,共 11086 字。