@Async Spring异步任务的深入学习与使用

2022-09-10 14:16:11

  基于最新Spring 5.x,详细介绍了Spring的@Async异步任务的概念和使用方法,以及一些问题的解决办法!

  Spring异步任务机制非常的有用,特别是在那些记录日志、发端短信、发送邮件等等非核心的业务上面,或者用在一些系统内部任务上,可以优化代码结构,加快程序响应速度,提升用户体验。

1 异步任务的概念

Spring的异步任务机制,可以让调用者立即返回而无需等待任务(方法)完成,可以避免方法阻塞,提升响应效率,通常用于日志记录、发送邮件、短信等非核心业务中。我们只需要非常简单的配置即可使用Spring异步任务机制。

1.1 开启异步任务支持

  我们可以使用注解或者XML配置的方式开启异步任务支持。

  1. 对于注解的方式,一般我们将@EnableAsync注解添加到@Configuration配置类上面,表示开启异步任务支持。
  2. 对于XML的方式,我们使用< task:annotation-driven/>标签来开启异步任务支持。

1.2 任务执行器

Spring提供了TaskExecutor任务执行器抽象接口,这等同于JDK 5.0中的java.util.concurrent.Executor执行器,简单的说任务执行器就是用于执行各种任务(方法)。关于Executor,可以看看这篇文章:JUC—六万字的Executor线程池框架源码深度解析
  不同的TaskExecutor有不同的执行策略,最常见的就是线程池执行器,当然还有其他类型的执行器,比如单线程执行器、同步执行器等等,因此不能笼统的将线程池和任务执行器划等号。Spring已经提供了各种版本的TaskExecutor实现,并且很多都是可配置的,通常来说我们无需自定义TaskExecutor实现。当我们想要使用的时候,只需要将这些bean定义注册到容器中即可。

TaskExecutor描述
SyncTaskExecutor此实现不以异步方式运行调用,每个任务调用都发生在调用线程中,简单的说就是还是同步执行的。它主要用于不需要多线程的情况,例如在简单的测试用例中。
SimpleAsyncTaskExecutor此实现不重用任何线程。相反,它会为每个调用启动新线程,创建大量的线程将会导致 OOM 错误。同时,它支持最大并发线程数量限制,但是如果正在执行任务的线程数量达到最大值限制,那么当前调用线程将会阻塞直到有空余线程被唤醒,所以谨慎使用。
ConcurrentTaskExecutor此实现是java.util.concurrent.Executor 实例的适配器,相比于ThreadPoolTaskExecutor更加灵活,它可以直接包装一个Executor实例,从而使用其他执行器的配置,很少需要直接使用并发任务执行器。
ThreadPoolTaskExecutor最常用的TaskExecutor线程池执行器实现,内部使用了JDK的ThreadPoolExecutor,它公开了用于配置 java.util.concurrent.ThreadPoolExecutor的某些bean属性,我们可以自定义线程池属性。
WorkManagerTaskExecutor位于commonj包,可以使用Java EE应用的上下文信息,通常只有WebLogic和 WebSphere才会使用。目前这个类已经被标注废弃了。
DefaultManagedTaskExecutor此实现在 JSR-236 兼容的运行时环境(如 Java EE 7+ 应用程序服务器)中使用 JNDI 获得的托管执行服务,并为此替换了WorkManagerTaskExecutor。

  Spring异步任务最关键的就是多线程的使用,那么这就和上面的任务执行器关联起来了。我们可以通过@Bean注解配置执行器或者基于XML自定义执行器bean。 通常自定义的执行器都是采用ThreadPoolTaskExecutor类型,但是这里的执行器只要是基于Executor接口都可以执行异步任务,也就是说JDK中的线程池也行。
基于XML的配置还可以使用< task:executor/>标签来快速定义一个ThreadPoolTaskExecutor类型的执行器,对应的线程前缀就是“执行器id-”。

1.3 @Async异步任务

无论是通过注解还是XML的方式开启异步任务支持,对于异步任务(方法)本身,都使用@Async注解描述,没有XML的描述方式,这个注解也是Spring 3.0添加的注解。

  1. @Async的语义仍然是通过代理来实现的。可以是JDK的动态代理或者CGLIB代理,通过@EnableAsync注解的proxyTargetClass属性或者< task:annotation-driven/>标签的proxy-target-class属性来控制,默认为false,表示优先使用JDK代理,否则再尝试CGLIB代理,如果改为true,表示强制CGLIB代理。关于Spring AOP,我们此前的文章就讲过了使用和源码!
  2. 被@Async标注的方法,称为异步方法,被@Async标注的类,它内部的所有方法都是异步方法,方法上的注解优先级最高,如果方法上存在@Async注解则直接使用该注解,如果没有,再查找类上的@Async注解。 注意如果采用JDK代理则只能代理接口的方法;如果采用CGLIB代理则不能代理final、static、private的方法,并且类不能是final的;由于代理的限制,如果同一个类的方法相互调用,如果@Async方法不是在调用链首位,那么被调用的@Async方法不会生效,实际上,基于AOP的其他配置都不会生效,比如事物,因为里面的方法实际上是通过目标对象本身调用的,并且如果配置了@Async方法,那么这些种情况不能通过普遍的方式解决,比较有效的解决办法是可以在注入的属性加上@Lazy注解,或者将方法写到不同的类里面。如果@Async方法不能满足上面的要求,则还是通过调用线程去执行该方法,可能不会抛出异常,因此难以察觉;
  3. @Async注解标注的异步方法通常没有返回值,但是可以有返回值,这需要使用一个Future类型的对象来接收,它的泛型类型可以是实际返回值类型,通过Future.get()来获取真实返回值或者抛出异常。实际上还可以使用ListenableFutureCompletableFuture来接收,这两个类作为异步获取结果类,更加适合与异步方法交互!
  4. @Async如果与生命周期回调方法结合使用,比如@PostConstruct方法,那么在执行生命周期回调时,不会异步执行,因为它时通过目标对象直接执行的。
  5. @Async的value可以指定一个我们自定义的执行器的名字,这将导致对于该方法或者该类的@Async方法检是用执行执行器中的线程去执行,这样有利于区分各种异步任务,如果没有指定,那么将查找默认执行器
    1. 首先是选择通过Java配置的AsyncConfigurer的getAsyncExecutor方法返回的执行器(该执行器不受到Spring管理,默认返回null)或者是通过< task:annotation-driven/>标签的executor属性指向的执行器(默认没有设置)。
    2. 如果上面的方法都没获取到执行器,那么继续判断,在容器中查找如果有一个TaskExecutor类型的执行器,那么该执行器作为默认执行器;如果有多个或者没有任何一个,那么将查找beanName或者别名为“taskExecutor”类型为Executor的执行器作为默认执行器,如果还是找不到,那么将创建一个SimpleAsyncTaskExecutor类型的执行器作为默认执行器(该执行器不受到Spring管理)。
  6. 对于具有Future返回值的异步方法,可以很方便的管理执行时的异常,因为产生的异常会被封装到Future中,同样是通过get方法抛出。对于无返回值的异步方法,默认异常处理程序是SimpleAsyncUncaughtExceptionHandler,它的逻辑是直接在调用对应方法的线程中通过error级别日志打印这个异常信息(不是抛出异常)。我们可以自定义一个异常处理器来处理这种异常,对于Java配置,可以通过重写AsyncConfigurer的getAsyncUncaughtExceptionHandler方法返回一个AsyncUncaughtExceptionHandler处理器;对于XML配置可以设置< task:annotation-driven/>标签的exception-handler属性指向一个异常处理器bean定义。
  7. Spring不能为@Async注解标注的类解决setter方法和反射字段注解的循环依赖注入(包括自己注入自己),将会抛出:“……This means that said other beans do not use the final version of the bean……”异常,根本原因这个AOP代理对象不是使用通用的AbstractAutoProxyCreator的方法创建的,而是使用AsyncAnnotationBeanPostProcessor后处理器来创建的,Spring目前没有解决这个问题。解决办法是在引入的依赖项上加一个@Lazy注解,原理就是再给它加一层AOP代理……。而其他的,Spring可以解决比如由于事物或者通知方法创建的AOP代理的循环依赖。

2 异步任务案例

2.1 基于XML的配置

  maven依赖:

<dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version> 5.2.8.RELEASE</version></dependency>

  一个测试类,com.spring.integration.tasks.xmltaskexecutor.AsyncMethod:

publicclassAsyncMethod{@Asyncpublicvoidlog(){System.out.println("-----log:"+Thread.currentThread().getName());}@Asyncpublicvoidlog2(){System.out.println("-----log2:"+Thread.currentThread().getName());log3();}@Asyncpublicvoidlog3(){System.out.println("-----log3:"+Thread.currentThread().getName());}}

  spring-config.xml配置文件,注意引入task的命名空间(idea可自动引入):

<?xml version="1.0" encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"><!--异步方法类--><beanclass="com.spring.integration.tasks.xmltaskexecutor.AsyncMethod"name="asyncTest"/><!--开启异步任务支持--><task:annotation-driven/><!--这里配置了一个Spring的ThreadPoolTaskExecutor标准执行器--><beanclass="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"name="threadPoolTaskExecutor"><propertyname="corePoolSize"value="5"/><propertyname="maxPoolSize"value="5"/><propertyname="keepAliveSeconds"value="5"/><propertyname="queueCapacity"value="5"/><propertyname="threadNamePrefix"value="threadPoolTaskExecutor-"/></bean><!--通过<task:executor/>标签快速配置一个Spring的ThreadPoolTaskExecutor标准执行器--><task:executorid="executor"pool-size="100-10000"queue-capacity="10"/><!--这里仅仅是配置了一个JDK的ThreadPoolExecutor--><beanclass="java.util.concurrent.ThreadPoolExecutor"name="taskExecutor"><constructor-argname="corePoolSize"value="5"/><constructor-argname="maximumPoolSize"value="6"/><constructor-argname="keepAliveTime"value="5"/><constructor-argname="unit"value="SECONDS"/><constructor-argname="workQueue"><beanclass="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg></bean></beans>

  可以看到,我们配置了三个不同的执行器,我们来测试一下:

publicclassXmlTaskExecutorTest{publicstaticvoidmain(String[] args){ClassPathXmlApplicationContext ac=newClassPathXmlApplicationContext("spring-config.xml");AsyncMethod asyncMethod= ac.getBean(AsyncMethod.class);System.out.println(asyncMethod.getClass());System.out.println("--------"+Thread.currentThread().getName()+"--------");
        asyncMethod.log();
        asyncMethod.log2();}}

  结果如下:

classcom.spring.integration.tasks.xmltaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$5da972aa--------main-------------log2:pool-1-thread-2-----log:pool-1-thread-1-----log3:pool-1-thread-2

  可以看到,成功的进行了异步调用,并且很明显,通过线程名称“pool-x-thread-y”可知采用了JDK的ThreadPoolExecutor,即“taskExecutor”作为默认执行器。如果我们注释其中一个TaskExecutor,比如将threadPoolTaskExecutor这个执行器的配置注释掉,再次测试结果如下:

classcom.spring.integration.tasks.xmltaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$62c5372c--------main-------------log:executor-1-----log2:executor-2-----log3:executor-2

  由于只有一个TaskExecutor,那么将使用这个TaskExecutor作为默认执行器,也就是“executor”。如果我们在< task:annotation-driven/>标签中添加executor属性,值为“taskExecutor”,那么表示将“taskExecutor”这个执行器作为默认执行器,再次测试,结果如下:

classcom.spring.integration.tasks.xmltaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$5ecd4858--------main-------------log:pool-1-thread-1-----log2:pool-1-thread-2-----log3:pool-1-thread-2

  我们最后将注释放开,并且删除executor属性,恢复到最开始的状态,然后将JDK的ThreadPoolExecutor的名字改为其他值,比如“taskExecutor1”,再次测试,结果如下:

classcom.spring.integration.tasks.xmltaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$5da972aa--------main-------------log:SimpleAsyncTaskExecutor-1-----log2:SimpleAsyncTaskExecutor-2-----log3:SimpleAsyncTaskExecutor-2

  可以看到,Spring采用了最后的策略,即内部创建一个SimpleAsyncTaskExecutor作为默认执行器,并且还有一段警告日志输出:

More than oneTaskExecutor bean found within the context, and none is named'taskExecutor'.Mark one of them as primary or name it'taskExecutor'(possibly as an alias) in ordertouse itfor async processing:[threadPoolTaskExecutor, executor]

  它的意思就是没有手动设置默认执行器,并且存在多个TaskExecutor类型的执行器,并且没有名为“taskExecutor”的执行器,那么这时就会使用最后的策略!

2.2 基于注解的配置

  基于注解的配置更加常用!
  一个测试类,com.spring.integration.tasks.anntaskexecutor.AsyncMethod:

@ComponentpublicclassAsyncMethod{@Asyncpublicvoidlog(){System.out.println("-----log:"+Thread.currentThread().getName());}@Asyncpublicvoidlog2(){System.out.println("-----log2:"+Thread.currentThread().getName());log3();}@Asyncpublicvoidlog3(){System.out.println("-----log3:"+Thread.currentThread().getName());}}

  参数组件类,com.spring.integration.tasks.anntaskexecutor.ConfigurationStart:

@Configuration@EnableAsync@ComponentScanpublicclassConfigurationStart{privatefinalLongAdder longAdder=newLongAdder();/**
     * 这里仅仅是配置了一个JDK的ThreadPoolExecutor
     */@BeanpublicExecutortaskExecutor(){returnnewThreadPoolExecutor(3,5,3,TimeUnit.SECONDS,newLinkedBlockingQueue<>(), r->{
            longAdder.increment();//线程命名returnnewThread(r,"JDK线程-"+ longAdder.longValue());});}/**
     * 这里仅仅是配置了一个Spring的ThreadPoolTaskExecutor
     */@BeanpublicThreadPoolTaskExecutorthreadPoolTaskExecutor1(){ThreadPoolTaskExecutor executor=newThreadPoolTaskExecutor();//配置核心线程数
        executor.setCorePoolSize(5);//配置最大线程数
        executor.setMaxPoolSize(10);//配置队列大小
        executor.setQueueCapacity(800);//配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("threadPoolTaskExecutor1-");// rejection-policy:拒绝策略,由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());return executor;}/**
     * 这里仅仅是配置了一个Spring的ThreadPoolTaskExecutor
     */@BeanpublicThreadPoolTaskExecutorthreadPoolTaskExecutor2(){ThreadPoolTaskExecutor executor=newThreadPoolTaskExecutor();//配置核心线程数
        executor.setCorePoolSize(5);//配置最大线程数
        executor.setMaxPoolSize(10);//配置队列大小
        executor.setQueueCapacity(800);//配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("threadPoolTaskExecutor2-");// rejection-policy:拒绝策略,由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());return executor;}}

  上面这些配置和基于XML的配置差不多,只不过改成了更方便的注解而已,我们测试一下:

publicclassAnnTaskExecutorTest{publicstaticvoidmain(String[] args){AnnotationConfigApplicationContext ac=newAnnotationConfigApplicationContext(ConfigurationStart.class);AsyncMethod asyncMethod= ac.getBean(AsyncMethod.class);System.out.println(asyncMethod.getClass());System.out.println("--------"+Thread.currentThread().getName()+"--------");
        asyncMethod.log();
        asyncMethod.log2();}}

  结果如下,可以预料到将会采用JDK的执行器:

classcom.spring.integration.tasks.anntaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$7abfce8--------main-------------log2:JDK线程-2-----log:JDK线程-1-----log3:JDK线程-2

  如果我们添加一个AsyncConfigurer的实现com.spring.integration.tasks.anntaskexecutor.MyAsyncConfigurer,并且重写getAsyncExecutor方法,返回一个自定义的Executor:

@ComponentpublicclassMyAsyncConfigurerimplementsAsyncConfigurer{@OverridepublicExecutorgetAsyncExecutor(){ThreadPoolTaskExecutor executor=newThreadPoolTaskExecutor();//配置核心线程数
        executor.setCorePoolSize(5);//配置最大线程数
        executor.setMaxPoolSize(10);//配置队列大小
        executor.setQueueCapacity(800);//配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("myAsyncConfigurer-");// rejection-policy:拒绝策略,由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());/*
         * 注意,这里配置的ThreadPoolTaskExecutor不会被Spring管理,因此需要手动initialize初始化
         */
        executor.initialize();return executor;}}

  再次测试,很明显getAsyncExecutor方法返回的执行器优先级最高:

classcom.spring.integration.tasks.anntaskexecutor.AsyncMethod$$EnhancerBySpringCGLIB$$5afb4f00--------main-------------log2:myAsyncConfigurer-2-----log3:myAsyncConfigurer-2-----log:myAsyncConfigurer-1

  当然,Spring推荐我们为每一个@Async指定一个执行器:

@ComponentpublicclassAsyncMethod{@Async("taskExecutor")publicvoid</
  • 作者:刘Java
  • 原文链接:https://blog.csdn.net/weixin_43767015/article/details/110135495
    更新时间:2022-09-10 14:16:11