SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

2023年2月10日12:25:46

前言

一般的,我们提及线程池,最先想到的自然是java中的Executors,它作为一个线程池的工具类,提供给我们几种常见的线程池。稍微深入的童鞋应该也知道,它的底层实现,是使用了java中的ThreadPoolExecutor,而且一般在面试的时候,面试官问到的也会是这个ThreadPoolExecutor。

我的这篇文章说的是在SpringBoot中使用线程池,自然也和它有关(底层代码实现也是它)。因此先来讲讲ThreadPoolExecutor的基本特点吧!

预热:ThreadPoolExecutor

首先我们看看它的继承关系:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

构造器

这里是它的参数最多的一个构造器:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

几个参数的含义分别是

  • corePoolSize:要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:池中允许的最大线程数;
  • keepAliveTime:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间;
  • unitkeepAliveTime参数的时间单位;
  • workQueue:用于在执行任务之前保存任务的队列。 这个队列将只保存execute方法提交的Runnable任务;
  • threadFactory:执行程序创建新线程时使用的工厂;
  • handler:执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量;

其中,ThreadFactory 的默认实现是:Executors.defaultThreadFactory()

RejectedExecutionHandler的默认实现是AbortPolicy,内部使用的是抛出异常的方式。当然,在企业中,为了不丢失任务,CallerRunsPolicy用的也是很多的。
CallerRunsPolicy的功能是:被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。

四种拒绝策略

ThreadPoolExecutor.AbortPolicy

处理程序在拒绝时抛出运行时RejectedExecutionException

ThreadPoolExecutor.CallerRunsPolicy

调用execute自身的线程运行任务。 这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。

ThreadPoolExecutor.DiscardPolicy

无法执行的任务被简单地丢弃。

ThreadPoolExecutor.DiscardOldestPolicy

如果执行器没有关闭,工作队列头部的任务会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。

工作流程

当一个新的任务提交给线程池时,线程池的处理步骤

  1. 首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)。

  2. 这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)。

  3. 判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)。

  4. 这时会使用淘汰策略来处理无法执行的Task任务。

正题:ThreadPoolTaskExecutor

那现在进入正题,ThreadPoolTaskExecutor 这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装一层,使得与Spring的整合更加方便。

继承关系

这是根据Idea生成的一个继承关系:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

成员变/常量

内部的成员变量有:
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor
可以看到,确实依赖的是ThreadPoolExecutor。

初始化方法

其中有一个初始化用的方法:

public void initialize() {
	if (logger.isDebugEnabled()) {
		logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
	}
	if (!this.threadNamePrefixSet && this.beanName != null) {
		setThreadNamePrefix(this.beanName + "-");
	}
	this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

可以看到这里调用的是initializeExecutor(this.threadFactory, this.rejectedExecutionHandler),那么我们再来看看这个方法做了什么?其实是初始化了一个ThreadPoolExecutor

protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

所以,它俩的关系,你明白了吗?就是这么暧昧!

高潮:SpringBoot中使用ThreadPoolTaskExecutor

首先确定你的java版本是1.8及其以上!

创建SpringBoot项目

然后创建一个最简单的SpringBoot项目:
依赖只需要:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
    </dependencies>

文件列表

启动类并未使用,直接用测试类测试即可达到效果!
SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor

配置属性文件


thread-pool.config.corePoolSize = 10
thread-pool.config.maxPoolSize = 100
thread-pool.config.queueCapacity = 200
thread-pool.config.threadNamePrefix = MyThread-
#  CallerRunsPolicy
thread-pool.config.rejectedExecutionHandler=

配置类

package org.feng.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * SpringBoot 装配线程池
 * @author FengJinSong
 */
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {

    private static final String EXECUTOR_NAME = "asyncExecutor";

    @Value("${thread-pool.config.corePoolSize:10}")
    private Integer corePoolSize;
    @Value("${thread-pool.config.maxPoolSize:100}")
    private Integer maxPoolSize;
    @Value("${thread-pool.config.queueCapacity:200}")
    private Integer queueCapacity;
    @Value("${thread-pool.config.threadNamePrefix:AsyncThread-}")
    private String threadNamePrefix;
    @Value("${thread-pool.config.rejectedExecutionHandler:CallerRunsPolicy}")
    private String rejectedExecutionHandler;

    @Bean(name = EXECUTOR_NAME)
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        // 最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        // 阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        // 待任务在关机时完成--表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 设置拒绝策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));

        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }


    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, obj) -> {
            log.error("[ThreadPool Exception]:Message [{}], Method [{}]", throwable.getMessage(), method.getName());
            for (Object param : obj) {
                log.error("Parameter value [{}] ", param);
            }
        };
    }

    /**
     * 根据传入的参数获取拒绝策略
     * @param rejectedName 拒绝策略名,比如 CallerRunsPolicy
     * @return RejectedExecutionHandler 实例对象,没有匹配的策略时,默认取 CallerRunsPolicy 实例
     */
    public RejectedExecutionHandler getRejectedExecutionHandler(String rejectedName){
        Map<String, RejectedExecutionHandler> rejectedExecutionHandlerMap = new HashMap<>(16);
        rejectedExecutionHandlerMap.put("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
        rejectedExecutionHandlerMap.put("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
        rejectedExecutionHandlerMap.put("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
        rejectedExecutionHandlerMap.put("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
        return rejectedExecutionHandlerMap.getOrDefault(rejectedName, new ThreadPoolExecutor.CallerRunsPolicy());
    }
}


测试

package org.feng;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;

@SpringBootTest
class SpringbootDemoApplicationTests {

    @Resource
    ThreadPoolTaskExecutor asyncExecutor;

    @Test
    void contextLoads() {
        // 控制台输出:MyThread-1-666
        CompletableFuture.runAsync(() -> {
            System.out.println(String.join("-", Thread.currentThread().getName(), "666"));
        }, asyncExecutor);
    }
}

结语:个人骚话(可看可不看鸭)

关于这些东西,都不能仅限制在能看懂代码,要尝试思考,灵活运用,及其核心的思想是如何的。
最后,各位,如果本文对你多少有点 学到了 的样子,烦请高抬贵手点个赞吧,求求了。

  • 作者:你家宝宝
  • 原文链接:https://fengjinsong.blog.csdn.net/article/details/120795737
    更新时间:2023年2月10日12:25:46 ,共 7678 字。