今天和大家分享的是:在开发服务端API时候,如何合理的运用线程池+CountDownLatch来保证API的高并发访问。
首先,作为Java开发的同学来说,java.util.concurrent并发包一定不会陌生,多多少少也会接触或使用过。今天的主角就是
java.util.concurrent.ThreadPoolExecutor和
java.util.concurrent.CountDownLatch。本文不详细赘述线程池的原理和数据结构,只是先普及下入门知识,然后再看如何正确的把这两种技术结合起来使用,来达到理想的效果。idea中直接command+N(mac下)输入这两个类即可打开JDK的源码:
CounDownLatch部分源码
CountDownLatch看起来很简单,就几个方法:
- public CountDownLatch(int count):用给定的计数初始化一个CountDownLatch;
- public void await() throws InterruptedException:等待所有线程执行完毕,即计数器值减到0或者当前线程被中断;
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException:等待所有线程执行完毕,即计数器值减到0或者当前线程被中断或者超时了;
- public void countDown():计数器值-1,当latch值为0时候恢复所有等待的线程。
ThreadPoolExecutor类图
上图是JDK自带的ThreadPoolExecutor类结构图,其实也很简单,大家可以自行打开看一下源码。这里我们直接说JDK自带的几种线程池:
- newFixedThreadPool:固定大小的线程池,队列无限大(使用不当会把内存吃完)
- newSingleThreadExecutor:单个线程的线程池,能够保证所有任务能够按照FIFO先进先出执行
- newScheduledThreadPool:定长线程池,支持定时及周期性任务执行
- newCachedThreadPool:线程池无限大,需要就new一个,小心你的内存
了解各种线程池的特性,以便在不同的业务场景中使用不同的线程池。下图是使用线程池时候阿里规约扫描插件给出的建议。个人认为在使用线程池时候基本上注意事项就是:不要自己随意去创建一个线程池、不要在方法里创建线程池(内存泄漏)、建议项目中统一配置公共线程池,线程池的各个参数统一配置(区分计算密集型业务和IO密集型业务来配置不同的线程数)。
线程池使用规范
下面给出我在项目开发中自定义的线程池策略,经过压测环境长时间的高并发的压测,能够保证CPU及内存稳定。并结合CountDownLatch让我们的API能够快起来!
先自定义一个线程池类,可以加入一些自己项目的特定信息或日志:
package com.test.app.conf.executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池
* 提供打印出当前线程池状态的方法
*
* @Author: javaer
* @Date: 2019-07-08 18:21
*/
@Slf4j
public class AppThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final long serialVersionUID = -4778941758120026886L;
/**
* 打印线程池状态日志
* @param prefix 执行的方法
*/
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
log.info("{},{},taskCount[{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("execute with timeout");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("submit callable");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("submitListenable callable");
return super.submitListenable(task);
}
}
注册一个线程池bean:
package com.test.app.conf.executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置
* @Author: javaer
* @Date: 2019-07-08 18:16
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
/**
* 线程名称前缀
*/
private final static String THREAD_NAME_PREFIX = "app-async-service-";
/**
* 核心线程数
*/
@Value("${spring.threadPool.corePoolSize}")
private int corePoolSize;
/**
* 最大线程数
*/
@Value("${spring.threadPool.maxPoolSize}")
private int maxPoolSize;
/**
* 队列长度
*/
@Value("${spring.threadPool.queueCapacity}")
private int queueCapacity;
/**
* 线程存活时长
*/
@Value(("${spring.threadPool.keepAliveSeconds}"))
private int keepAliveSeconds;
@Bean
public ThreadPoolTaskExecutor appServiceExecutor() {
ThreadPoolTaskExecutor executor = new AppThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//线程池队列大小
executor.setQueueCapacity(queueCapacity);
//线程存活时长
executor.setKeepAliveSeconds(keepAliveSeconds);
//配置线程池中的线程名称前缀
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
//当线程池maxPoolSize满了,采取由当前调用者所在的线程处理任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
executor.initialize();
log.info("thread pool executor initialized");
return executor;
}
}
结合CountDownLatch使用示例
CountDownLatch结合线程池使用
由于业务代码无法开放出来,所以在CountDownLatch+线程池使用的地方我提供了一个简单的使用示例,相信大家也能看明白。之前我在做我们APP的首页的时候,由于之前APP的首页全部数据由一个接口提供(这里就不吐槽接口设计的问题),压测的时候平均响应耗时8000ms,完全跑不起来的节奏。后来我采用了上面的方案优化了一波,再加了一层redis缓存(真的害怕哪天量突然上来,数据库就挂了),最终压测并发100的情况下,平均响应时间100ms左右,CPU和内存在压测的30分钟内也保持平稳的运行。
当然,CountDownLatch+线程池可以广泛应用于业务开发的很多地方,只要是这个API需要聚合多个地方的数据,那么它就有用武之地。让你的API飞起来,就是这么简单!