使用线程池与CountDownLatch多线程提升系统性能

2022-06-16 09:07:53

下面这个业务场景,大家可能都会遇到,在遍历一个list的时候,需要对list中的每个对象,做一些复杂又耗时的操作,比如取出对象的uid,远程调用一次userservice的getUserByUid方法,这属于IO操作了,可怕的是遍历到每个对象时,都得执行一次这种RPC的IO操作(甚至不止一次,因为可能还有别的接口需要去调)还有复杂的业务逻辑需要cpu去计算。

以上这种场景,属于IO操作和CPU操作混合,如果是纯IO操作的话,也是可以用这种方案来解决的,因为一个线程向远程服务器发出请求了,再等待响应的过程中,其他线程也可以向远程服务器发出请求(请求-请求-响应-响应),这样也比请求-响应-请求-响应 这种模式要快。

下面贴上一个小demo,供大家参考。
ExecutorService作为一个线程池,然后利用CountDownLatch可以让指定数量的线程都执行完再执行主线程的特性。就可以实现多线程提速了。
套路是这样的:
1、实现runnable接口实现一个run方法,里面执行我们的耗时复杂业务操作。
2、在循环里给list里的每个对象分配一个线程
3、使用CountDownLatch让主线程等待工作线程全部执行完毕后之后,再继续执行。


这里写代码片
//多核定制线程池
static ExecutorService taskPool = ExecutorUtils.newMultiCpuFlexibleThreadPool(5, “task-pool”);

//为了提高性能,耗时的业务逻辑操作做使用多线程处理
public class RunnerTask implements Runnable {
    private EnumSet<GoodsMsgEnum.property> goodProperty;
    private List<GoodAllMsg> resultList;
    private UserApply item;
    private String logStr;
    private CountDownLatch latch;
    public RunnerTask(List<GoodAllMsg> resultList,UserApply item,EnumSet<GoodsMsgEnum.property> goodProperty,String logStr,CountDownLatch latch) {
        this.resultList=resultList;
        this.item=item;
        this.goodProperty=goodProperty;
        this.logStr=logStr;
        this.latch=latch;

    }
    @Override
    public void run() {
        try {
            GoodAllMsg aa = goodsById(item.getInfoId(), goodProperty, logStr);
            if(aa!=null){
                aa.setAuditType(GlobalsDataCache.sysStatus.get(GlobalsVar.GOOD_AUDIT_STATUS_ACTIVE_PREFIX+item.getStatus()));
                addTotalList(resultList,aa);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(this.latch!=null){
                latch.countDown();
            }
        }
    }

}

private synchronized void addTotalList(List<GoodAllMsg> resultList,GoodAllMsg item){
    resultList.add(item);
}

/**
 * 多线程处理业务逻辑,提升性能
 * @param taskList 任务队列
 * @param resultList  结果集
 * @param goodProperty
 * @param logStr
 */
public List<GoodAllMsg> multiThreadProcess(List<UserApply> taskList,List<GoodAllMsg> resultList,EnumSet<GoodsMsgEnum.property> goodProperty,String logStr) throws InterruptedException {
    if(taskList!=null && !taskList.isEmpty()){
        //创建闭锁,计数器大小为任务多列的长多
        CountDownLatch latch=new CountDownLatch(taskList.size());
        for(UserApply item:taskList){
            //把闭锁对象传入线程中,并在线程的finally代买块中将闭锁计数器减一
            RunnerTask runnerTask=new RunnerTask(resultList,item,goodProperty,logStr,latch);
            taskPool.execute(runnerTask);
        }
        //主线程开始等待,直到计数器大小为0,返回结果
        latch.await();
        return resultList;
    }
    return new ArrayList<GoodAllMsg>();
}
  • 作者:吴孟达
  • 原文链接:https://blog.csdn.net/Derek_BMW/article/details/52234640
    更新时间:2022-06-16 09:07:53