线程池+CountDownLatch高并发详解

2022-06-14 10:25:28

今天和大家分享的是:在开发服务端API时候,如何合理的运用线程池+CountDownLatch来保证API的高并发访问。

首先,作为Java开发的同学来说,java.util.concurrent并发包一定不会陌生,多多少少也会接触或使用过。今天的主角就是
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.CountDownLatch。本文不详细赘述线程池的原理和数据结构,只是先普及下入门知识,然后再看如何正确的把这两种技术结合起来使用,来达到理想的效果。idea中直接command+N(mac下)输入这两个类即可打开JDK的源码:

线程池+CountDownLatch——高并发就是这么简单

CounDownLatch部分源码

CountDownLatch看起来很简单,就几个方法:

  • public CountDownLatch(int count):用给定的计数初始化一个CountDownLatch;
  • public void await() throws InterruptedException:等待所有线程执行完毕,即计数器值减到0或者当前线程被中断;
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException:等待所有线程执行完毕,即计数器值减到0或者当前线程被中断或者超时了;
  • public void countDown():计数器值-1,当latch值为0时候恢复所有等待的线程。

线程池+CountDownLatch——高并发就是这么简单

ThreadPoolExecutor类图

上图是JDK自带的ThreadPoolExecutor类结构图,其实也很简单,大家可以自行打开看一下源码。这里我们直接说JDK自带的几种线程池:

  • newFixedThreadPool:固定大小的线程池,队列无限大(使用不当会把内存吃完)
  • newSingleThreadExecutor:单个线程的线程池,能够保证所有任务能够按照FIFO先进先出执行
  • newScheduledThreadPool:定长线程池,支持定时及周期性任务执行
  • newCachedThreadPool:线程池无限大,需要就new一个,小心你的内存

了解各种线程池的特性,以便在不同的业务场景中使用不同的线程池。下图是使用线程池时候阿里规约扫描插件给出的建议。个人认为在使用线程池时候基本上注意事项就是:不要自己随意去创建一个线程池、不要在方法里创建线程池(内存泄漏)、建议项目中统一配置公共线程池,线程池的各个参数统一配置(区分计算密集型业务和IO密集型业务来配置不同的线程数)。

线程池+CountDownLatch——高并发就是这么简单

线程池使用规范

下面给出我在项目开发中自定义的线程池策略,经过压测环境长时间的高并发的压测,能够保证CPU及内存稳定。并结合CountDownLatch让我们的API能够快起来!

先自定义一个线程池类,可以加入一些自己项目的特定信息或日志:

package com.test.app.conf.executor;

import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;

import java.util.concurrent.Future;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 线程池

* 提供打印出当前线程池状态的方法

*

* @Author: javaer

* @Date: 2019-07-08 18:21

*/

@Slf4j

public class AppThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

private static final long serialVersionUID = -4778941758120026886L;

/**

* 打印线程池状态日志

* @param prefix 执行的方法

*/

private void showThreadPoolInfo(String prefix) {

ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

log.info("{},{},taskCount[{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",

this.getThreadNamePrefix(),

prefix,

threadPoolExecutor.getTaskCount(),

threadPoolExecutor.getCompletedTaskCount(),

threadPoolExecutor.getActiveCount(),

threadPoolExecutor.getQueue().size());

}

@Override

public void execute(Runnable task) {

showThreadPoolInfo("execute");

super.execute(task);

}

@Override

public void execute(Runnable task, long startTimeout) {

showThreadPoolInfo("execute with timeout");

super.execute(task, startTimeout);

}

@Override

public Future<?> submit(Runnable task) {

showThreadPoolInfo("submit");

return super.submit(task);

}

@Override

public <T> Future<T> submit(Callable<T> task) {

showThreadPoolInfo("submit callable");

return super.submit(task);

}

@Override

public ListenableFuture<?> submitListenable(Runnable task) {

showThreadPoolInfo("submitListenable");

return super.submitListenable(task);

}

@Override

public <T> ListenableFuture<T> submitListenable(Callable<T> task) {

showThreadPoolInfo("submitListenable callable");

return super.submitListenable(task);

}

}

注册一个线程池bean:

package com.test.app.conf.executor;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 线程池配置

* @Author: javaer

* @Date: 2019-07-08 18:16

*/

@Configuration

@EnableAsync

@Slf4j

public class ExecutorConfig {

/**

* 线程名称前缀

*/

private final static String THREAD_NAME_PREFIX = "app-async-service-";

/**

* 核心线程数

*/

@Value("${spring.threadPool.corePoolSize}")

private int corePoolSize;

/**

* 最大线程数

*/

@Value("${spring.threadPool.maxPoolSize}")

private int maxPoolSize;

/**

* 队列长度

*/

@Value("${spring.threadPool.queueCapacity}")

private int queueCapacity;

/**

* 线程存活时长

*/

@Value(("${spring.threadPool.keepAliveSeconds}"))

private int keepAliveSeconds;

@Bean

public ThreadPoolTaskExecutor appServiceExecutor() {

ThreadPoolTaskExecutor executor = new AppThreadPoolTaskExecutor();

//核心线程数

executor.setCorePoolSize(corePoolSize);

//最大线程数

executor.setMaxPoolSize(maxPoolSize);

//线程池队列大小

executor.setQueueCapacity(queueCapacity);

//线程存活时长

executor.setKeepAliveSeconds(keepAliveSeconds);

//配置线程池中的线程名称前缀

executor.setThreadNamePrefix(THREAD_NAME_PREFIX);

//当线程池maxPoolSize满了,采取由当前调用者所在的线程处理任务

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//初始化线程池

executor.initialize();

log.info("thread pool executor initialized");

return executor;

}

}

结合CountDownLatch使用示例

线程池+CountDownLatch——高并发就是这么简单

CountDownLatch结合线程池使用

由于业务代码无法开放出来,所以在CountDownLatch+线程池使用的地方我提供了一个简单的使用示例,相信大家也能看明白。之前我在做我们APP的首页的时候,由于之前APP的首页全部数据由一个接口提供(这里就不吐槽接口设计的问题),压测的时候平均响应耗时8000ms,完全跑不起来的节奏。后来我采用了上面的方案优化了一波,再加了一层redis缓存(真的害怕哪天量突然上来,数据库就挂了),最终压测并发100的情况下,平均响应时间100ms左右,CPU和内存在压测的30分钟内也保持平稳的运行。

当然,CountDownLatch+线程池可以广泛应用于业务开发的很多地方,只要是这个API需要聚合多个地方的数据,那么它就有用武之地。让你的API飞起来,就是这么简单!

以上是我个人在使用java线程池和CountDownLatch的一些建议和心得,建议大家在迅速了解的前提下动手去实践一下,在实践的过程中一定会有更深的理解和收获。当你有了一定的认识之后再回过头去学习它的原理和源码知识,一定是事半功倍的。

  • 作者:普通网友
  • 原文链接:https://blog.csdn.net/m0_62714732/article/details/120991127
    更新时间:2022-06-14 10:25:28