SpringBoot项目框架下ThreadPoolExecutor线程池+Queue缓冲队列实现高并发中进行下单业务

2022年7月14日13:15:12

主要是自己在项目中(中小型项目) 有支付下单业务(只是办理VIP,没有涉及到商品库存),目前用户量还没有上来,目前没有出现问题,但是想到如果用户量变大,下单并发量变大,可能会出现一系列的问题,趁着空闲时间,做了这个demo测试相关问题。

可能遇到的问题如下:

 1.订单重复

 2.高并发下,性能变慢

解决方式:ThreadPoolExecutor线程池 + Queue队列

开发工具:IDEA 15

1.首先是springBoot的项目框架如下:

SpringBoot项目框架下ThreadPoolExecutor线程池+Queue缓冲队列实现高并发中进行下单业务

2.业务测试流程涉及的类,如下

BusinessThread 类

packagecom.springboot.demo.Threads;importorg.springframework.context.annotation.Scope;importorg.springframework.stereotype.Component;/** * Created by Administrator on 2018/5/9. */@Component@Scope("prototype")//spring多例public classBusinessThreadimplementsRunnable{privateStringacceptStr;    publicBusinessThread(String acceptStr) {this.acceptStr= acceptStr;}publicStringgetAcceptStr() {returnacceptStr;}public voidsetAcceptStr(String acceptStr) {this.acceptStr= acceptStr;}@Overridepublic voidrun() {//业务操作System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);//线程阻塞/*try {            Thread.sleep(1000);            System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);        } catch (InterruptedException e) {            e.printStackTrace();        }*/}
}

TestThreadPoolManager 类

packagecom.springboot.demo.Threads;importorg.springframework.beans.BeansException;importorg.springframework.beans.factory.BeanFactory;importorg.springframework.beans.factory.BeanFactoryAware;importorg.springframework.stereotype.Component;importjava.util.Map;importjava.util.Queue;importjava.util.concurrent.*;/** * Created by Administrator on 2018/5/10. */@Componentpublic classTestThreadPoolManagerimplementsBeanFactoryAware {//用于从IOC里取对象privateBeanFactoryfactory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下//线程池维护线程的最少数量private final static intCORE_POOL_SIZE=2;//线程池维护线程的最大数量private final static intMAX_POOL_SIZE=10;//线程池维护线程所允许的空闲时间private final static intKEEP_ALIVE_TIME=0;//线程池所使用的缓冲队列大小private final static intWORK_QUEUE_SIZE=50;@Overridepublic voidsetBeanFactory(BeanFactory beanFactory)throwsBeansException {factory= beanFactory;}/**     *用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复*/Map<String,Object>cacheMap=newConcurrentHashMap<>();/**     *订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列*/Queue<Object>msgQueue=newLinkedBlockingQueue<Object>();/**     *当线程池的容量满了,执行下面代码,将订单存入到缓冲队列*/finalRejectedExecutionHandlerhandler=newRejectedExecutionHandler() {@Overridepublic voidrejectedExecution(Runnable r,ThreadPoolExecutor executor) {//订单加入到缓冲队列msgQueue.offer(((BusinessThread) r).getAcceptStr());System.out.println("系统任务太忙了,把此订单交给(调度线程池)逐一处理,订单号:"+ ((BusinessThread) r).getAcceptStr());}
    };/**创建线程池*/finalThreadPoolExecutorthreadPool=newThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS, newArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);/**将任务加入订单线程池*/public voidaddOrders(String orderId){
        System.out.println("此订单准备添加到线程池,订单号:"+ orderId);//验证当前进入的订单是否已经存在if(cacheMap.get(orderId) ==null) {cacheMap.put(orderId, newObject());BusinessThread businessThread =newBusinessThread(orderId);threadPool.execute(businessThread);}
    }/**     *线程池的定时任务---->称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。*/finalScheduledExecutorServicescheduler= Executors.newScheduledThreadPool(5);/**     *检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池*/finalScheduledFuturescheduledFuture=scheduler.scheduleAtFixedRate(newRunnable() {@Overridepublic voidrun() {//判断缓冲队列是否存在记录if(!msgQueue.isEmpty()){//当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池if(threadPool.getQueue().size() <WORK_QUEUE_SIZE) {
                    String orderId = (String)msgQueue.poll();BusinessThread businessThread =newBusinessThread(orderId);threadPool.execute(businessThread);System.out.println("(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:"+orderId);}
            }
        }
    },0,1,TimeUnit.SECONDS);/**获取消息缓冲队列*/publicQueue<Object>getMsgQueue() {returnmsgQueue;}/**终止订单线程池+调度线程池*/public voidshutdown() {//true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止System.out.println("终止订单线程池+调度线程池:"+scheduledFuture.cancel(false));scheduler.shutdown();threadPool.shutdown();}
}

TestController 类

packagecom.springboot.demo;importcom.springboot.demo.Threads.TestThreadPoolManager;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RestController;importjava.util.Queue;importjava.util.UUID;/** * Created by Administrator on 2018/5/9. */@RestControllerpublic classTestController {@AutowiredTestThreadPoolManagertestThreadPoolManager;/**     *测试模拟下单请求 入口*@paramid*@return*/@GetMapping("/start/{id}")publicStringstart(@PathVariableLong id) {//模拟的随机数String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();testThreadPoolManager.addOrders(orderNo);        return"Test ThreadPoolExecutor start";}/**     *停止服务*@paramid*@return*/@GetMapping("/end/{id}")publicStringend(@PathVariableLong id) {testThreadPoolManager.shutdown();Queue q =testThreadPoolManager.getMsgQueue();System.out.println("关闭了线程服务,还有未处理的信息条数:"+ q.size());        return"Test ThreadPoolExecutor start";}
}

3.使用JMeter模拟并发下单请求 (JMeter使用可自行百度)

SpringBoot项目框架下ThreadPoolExecutor线程池+Queue缓冲队列实现高并发中进行下单业务

4.打印的日志说明,开始的订单直接执行插入到系统,当线程池的容量已经满了,则使用RejectedExecutionHandler方法把后面的订单添加到 Queue缓冲队列,使用ScheduledFuture方法定时(我这里是每秒一次)检查Queue队列,重新把队列里面的订单添加到线程池,执行后面的插入任务。部分日志如下

SpringBoot项目框架下ThreadPoolExecutor线程池+Queue缓冲队列实现高并发中进行下单业务

  • 作者:java的迷糊学子
  • 原文链接:https://blog.csdn.net/u011677147/article/details/80271174
    更新时间:2022年7月14日13:15:12 ,共 4845 字。