SpringBoot @Async异步并行执行任务

2022-09-10 14:46:20

说@Async注解时,得先说说异步任务的由来,按我的理解,从Java5版本就提供 Future 接口,Future接口可以充分利用多核CPU处理能力,它允许任务在一个新的核上开启一个新的子线程,子线程和原来的任务同时运行,因为Future 的局限性Java8对Future进行重新实现,实现类 CompletableFuture,而Spring对Future接口进行了封装,使用 @Async 注解可以方便的处理异步任务
Future 与 CompletableFuture ,有位网友写的很好,大家可以看看:https://segmentfault.com/a/1190000014479792

异步的好处是,可以提高程序吞吐量,一个任务,让耗时的异步处理,并继续同步处理后面的任务,异步任务可以返回结果,拿到结果后可结合同步处理过程中的变量一起处理计算
在Spring中运用 Async注解 需要注意几点:
  • 1.方法名必须是public进行修饰的,且不能是static方法
  • 2.不能与调用的方法在同一个类中
  • 3.需要把该方法注入到Spring容器中,就是在一个类中添加异步方法,并在此类上使用@Component之类的注解加入到容器

直接上代码,有3个类,并运行测试方法看效果

  • 1.AsyncTest.java,测试类,调用异步任务,同时执行同步方法
  • 2.OrderService.java,异步任务类,提供异步方法
  • 3.AsyncThreadPoolConfig.java,异步任务线程池配置类,配置异步任务运行的线程池大小等
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncTest {
   @Autowired
    OrderService orderService;
   @Test
    public void testAsyncFuture() throws InterruptedException,ExecutionException{
        LogUtil.info("开始async");
        Future<List<String>> future1 = orderService.getList();
        Future<List<String>> future2 = orderService.getList2();
        //同步执行for循环
        for(int i=0; i < 10;i++){
            LogUtil.info("i:" + i);
        }
        
        //获取异步任务的处理结果,异步任务没有处理完成,会一直阻塞,可以设置超时时间,使用 get 的重载方法
        List<String> list1 = future1.get();
        LogUtil.info("list size1:" + list1.size());
        List<String> list2 = future1.get();
        LogUtil.info("list size2:" + list2.size());
    }
}
@Service
public class OrderService{
    /**
     * 异步任务1,返回处理结果
     * @return
     */
    @Async("taskExecutor")
    public Future<List<String>> getList(){
        List<String> list = new ArrayList<>();
        try {
            LogUtil.info("开始处理任务1");
            for(int i=0; i < 100000;i++){
                list.add(i+"");
            }
            //让线程睡2秒
            Thread.sleep(1500);
            LogUtil.info("任务1处理完成");
        } catch (Exception e) {

        }
        return new AsyncResult<>(list);
    }

    /**
     * 异步任务2,返回处理结果
     * @return
     */
    @Async("taskExecutor")
    public Future<List<String>> getList2(){
        List<String> list = new ArrayList<>();
        try {
            LogUtil.info("开始处理任务2");
            for(int i=0; i < 100000;i++){
                list.add(i+"");
            }
            //让线程睡2秒
            Thread.sleep(1500);
            LogUtil.info("任务2处理完成");
        } catch (Exception e) {

        }
        return new AsyncResult<>(list);
    }
}
/**
 * 把context传递到线程中
 */
public class ContextCopyingDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}
@Configuration
@EnableAsync
public class AsyncThreadPoolConfig {

    private static final int cpu = Runtime.getRuntime().availableProcessors();//获取当前机器CPU数量
    private static final int corePoolSize = cpu;       		// 核心线程数(默认线程数)
    private static final int maxPoolSize = cpu * 2;		    // 最大线程数
    private static final int keepAliveTime = 60;			// 允许线程空闲时间(单位:默认为秒)
    private static final int queueCapacity = 200;			// 缓冲队列数
    private static final String threadNamePrefix = "taskExecutor-"; // 线程池名前缀

    @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
    public ThreadPoolTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new ContextCopyingDecorator());
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);
        // 线程池对拒绝任务的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

运行测试方法,结果如下,

  • 1.打印开始进入方法
  • 2.打印异步任务1开始处理
  • 3.打印同步任务开始处理,并马上处理异步任务2
  • 4.同步任务处理完后,会一直阻塞等待异步任务处理完,拿到异步任务的结果
  • 5.因配置了异步任务的处理线程池配置,可以看到同步任务是在main线程上完成的,异步任务是在 taskExceutor 上完成的,且2个异步任务分别在不同的线程上处理,通过合理调整该线程池数量的大小可以提供更高的吞吐量
    在这里插入图片描述
  • 作者:闪耀的瞬间
  • 原文链接:https://zhuyu.blog.csdn.net/article/details/99992668
    更新时间:2022-09-10 14:46:20