深入浅出Spring @EnableAsync、@Async

2023年2月7日08:56:40

参考链接1

EnableAsync

源码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;	
	
	boolean proxyTargetClass() default false;
	
	AdviceMode mode() default AdviceMode.PROXY;
	
	int order() default Ordered.LOWEST_PRECEDENCE;
}

修饰范围:类型

AdviceMode

public enum AdviceMode {

	/**
	 * JDK proxy-based advice.
	 */
	PROXY,

	/**
	 * AspectJ weaving-based advice.
	 */
	ASPECTJ

}

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

Async

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
	String value() default "";
}

修饰范围:类型、方法。

案例1 无返回值

@Component
public class LogService {
    @Async
    public void log(String msg) throws InterruptedException {
        System.out.println(Thread.currentThread() + "开始记录日志," + System.currentTimeMillis());
        //模拟耗时2秒
        TimeUnit.SECONDS.sleep(2);
        System.out.println(Thread.currentThread() + "日志记录完毕," + System.currentTimeMillis());
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        System.out.println(Thread.currentThread() + " logService.log start," + System.currentTimeMillis());
        logService.log("异步执行方法!");
        System.out.println(Thread.currentThread() + " logService.log end," + System.currentTimeMillis());
        TimeUnit.SECONDS.sleep(3);
    }
}
Thread[main,5,main] logService.log start,1667887578200
14:06:18.209 [main] DEBUG org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - Could not find default TaskExecutor bean
org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.getDefaultExecutor(AsyncExecutionAspectSupport.java:233)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.getDefaultExecutor(AsyncExecutionInterceptor.java:157)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$configure$2(AsyncExecutionAspectSupport.java:119)
	at org.springframework.util.function.SingletonSupplier.get(SingletonSupplier.java:100)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.determineAsyncExecutor(AsyncExecutionAspectSupport.java:172)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:107)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
	at com.example.lurenjia.spring.c37.d1.LogService$$EnhancerBySpringCGLIB$$ecabcff7.log(<generated>)
	at com.example.lurenjia.spring.c37.d1.Client.main(Client.java:23)
14:06:18.210 [main] INFO org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
Thread[main,5,main] logService.log end,1667887578212
Thread[SimpleAsyncTaskExecutor-1,5,main]开始记录日志,1667887578220
Thread[SimpleAsyncTaskExecutor-1,5,main]日志记录完毕,1667887580234

进程已结束,退出代码为 0

为什么不报错的原因在这里

AsyncExecutionInterceptor 这个类有一个getDefaultExecutor

	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

首先在容器中找TaskExecutor这个类型的bean如果有就用,没有就会创建一个SimpleAsyncTaskExecutor

想要修复这个错就得自定义一个线程池

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("my-thread-");
        return executor;
    }
}

修复后输出

14:27:03.298 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
Thread[main,5,main] logService.log start,1667888823308
Thread[main,5,main] logService.log end,1667888823310
Thread[my-thread-1,5,main]开始记录日志,1667888823316
Thread[my-thread-1,5,main]日志记录完毕,1667888825316

案例2 有返回值

@Async
@Component
public class GoodsService {
    public Future<String> getGoodsInfo(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s基本信息!", goodsId));
    }

    public Future<String> getGoodsDesc(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s描述信息!", goodsId));
    }

    public Future<List<String>> getGoodsComments(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        List<String> comments = Arrays.asList("评论1", "评论2");
        return AsyncResult.forValue(comments);
    }
}
@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        GoodsService goodsService = context.getBean(GoodsService.class);

        long starTime = System.currentTimeMillis();
        System.out.println("开始获取商品的各种信息");

        long goodsId = 1L;
        Future<String> goodsInfoFuture = goodsService.getGoodsInfo(goodsId);
        Future<String> goodsDescFuture = goodsService.getGoodsDesc(goodsId);
        Future<List<String>> goodsCommentsFuture = goodsService.getGoodsComments(goodsId);

        System.out.println(goodsInfoFuture.get());
        System.out.println(goodsDescFuture.get());
        System.out.println(goodsCommentsFuture.get());

        System.out.println("商品信息获取完毕,总耗时(ms):" + (System.currentTimeMillis() - starTime));

        //休眠一下,防止@Test退出
        TimeUnit.SECONDS.sleep(3);
    }
}
15:00:41.668 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
开始获取商品的各种信息
商品1基本信息!
商品1描述信息!
[评论1, 评论2]
商品信息获取完毕,总耗时(ms)521

自定义线程池

方式一

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}

文章中说名称必须为taskExecutor,这个是不对的,将名字修改为任意值测试即可。

异常处理

在之前 juc 的篇目中我们知道Thread的runable方法只能通过UncaughtExceptionHandler来处理,future可以通过try-catch包裹get方法来捕获异常,在这里也是相同的道理。

有返回值

@Service
public class LogService {
    
    @Async
    public Future<String> mockException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("参数有误!");
    }

    @Async
    public void mockNoReturnException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("无返回值的异常!");
    }
}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        try {
            Future<String> future = logService.mockException();
            System.out.println(future.get());
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
}

无返回值

@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService
  • 作者:newProxyInstance
  • 原文链接:https://blog.csdn.net/qq_37151886/article/details/127749503
    更新时间:2023年2月7日08:56:40 ,共 8540 字。