SpringMVC的异步
最近接触了springMVC的异步模式,总结下来有两个优点:
第一,当然是节约tomcat容器的线程
第二,可以利用异步超时,起到一定的超时降级保护
注意:在Controller中使用时,一定要注意做好接口的线程池隔离,让慢的接口使用固定数量的线程池, 否则从tomcat减少的线程会转移到应用里,导致拥塞,在部分接口下游异常的情况的情况下,会出现影响正常接口的服务.
关于Spring MVC的异步
同步接口在请求处理过程中一直处于阻塞状态,而异步接口可以启用后台线程去处理耗时任务。基本的使用场景:
高并发场景
高耗时场景
SpringMVC提供的几种异步实现方案:
Callable 提供的带有返回值的并发操作
WebAsyncTask 对Callable的封装处理
@Async
DeferredResult
Callable
就是最简单的线程运行,然后获取返回值的模式。优点是简单,缺点是不能中途取消,只有结合FutureTask才能够提供一定的控制。但是,也不能提供异步通知的方式。
/**
* @author qiyu
* @date 2020-07-30 20:03
*/
@Slf4j
@RestController
public class AsyncController {
@GetMapping("/test")
public String test() throws InterruptedException {
log.info("主线程开始=====>"+ Thread.currentThread().getName());
Thread.sleep(30000);
log.info("主线程结束=====>"+ Thread.currentThread().getName());
return "success";
}
@GetMapping("/testAsync")
public Callable<String> testAsync() throws InterruptedException {
log.info("主线程开始=====>"+ Thread.currentThread().getName());
Callable<String> callable = () -> {
log.info("异步线程开始=====>" + Thread.currentThread().getName());
Thread.sleep(30000);
log.info("异步线程结束=====>" + Thread.currentThread().getName());
return "success";
};
log.info("主线程结束=====>"+ Thread.currentThread().getName());
return callable;
}
}
WebAsyncTask 实例
WebAsyncTask
: 在构造时写入Callable
主要业务逻辑WebAsyncTask.onCompletion(Runnable)
:在当前任务执行结束以后,无论是执行成功还是异常中止,onCompletion的回调最终都会被调用WebAsyncTask.onError(Callable>)
:当异步任务抛出异常的时候,onError()
方法即会被调用WebAsyncTask.onTimeout(Callable>)
:当异步任务发生超时的时候,onTimeout()
方法即会被调用
WebAsyncTask
类是Spring
提供的一步任务处理类。
WebAsyncTask
是Callable
的升级版
@RequestMapping("/async")
@ResponseBody
public WebAsyncTask<String> asyncTask(){
// 1000 为超时设置
WebAsyncTask<String> webAsyncTask = new WebAsyncTask<String>(1000,new Callable<String>(){
@Override
public String call() throws Exception {
//业务逻辑处理
Thread.sleep(5000);
String message = "username:wangbinghua";
return message;
}
});
webAsyncTask.onCompletion(new Runnable() {
@Override
public void run() {
System.out.println("调用完成");
}
});
webAsyncTask.onTimeout(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("业务处理超时");
return "<h1>Time Out</h1>";
}
});
return webAsyncTask;
}
@Async实例
含义
1,在方法上使用该@Async注解,申明该方法是一个异步任务;
2,在类上面使用该@Async注解,申明该类中的所有方法都是异步任务;
3,使用此注解的方法的类对象,必须是spring管理下的bean对象;
4,要想使用异步任务,需要在主类上开启异步配置,即,配置上@EnableAsync注解;
用法
在Spring中启用@Async:
1,@Async注解在使用时,如果不指定线程池的名称,则使用Spring默认的线程池,Spring默认的线程池为**SimpleAsyncTaskExecutor**。 2,方法上一旦标记了这个@Async注解,当其它线程调用这个方法时,就会开启一个新的子线程去异步处理该业务逻辑。
例子
3.1,启动类中增加@EnableAsync
以Spring boot 为例,启动类中增加@EnableAsync:
@EnableAsync@SpringBootApplicationpublic class ManageApplication { //...}
3.2,方法上加@Async注解:
@Componentpublic class MyAsyncTask { @Async public void asyncCpsItemImportTask(Long platformId, String jsonList){ //...具体业务逻辑 }}
3.3,默认线程池的缺陷:
上面的配置会启用默认的线程池/执行器,异步执行指定的方法。 Spring默认的线程池的默认配置: 默认核心线程数:8, 最大线程数:Integet.MAX_VALUE, 队列使用LinkedBlockingQueue, 容量是:Integet.MAX_VALUE, 空闲线程保留时间:60s, 线程池拒绝策略:AbortPolicy。 从最大线程数的配置上,相信你也看到问题了:**并发情况下,会无限创建线程。。。**
3.4,默认线程池–自定义配置参数:
默认线程池的上述缺陷如何解决: 答案是,自定义配置参数就可以了。 spring: task: execution: pool: max-size: 6 core-size: 3 keep-alive: 3s queue-capacity: 1000 thread-name-prefix: name
实现原理
@Async的原理概括:
@Async 的原理是通过 Spring AOP 动态代理 的方式来实现的。
Spring容器启动初始化bean时,判断类中是否使用了@[Async](https://so.csdn.net/so/search?q=Async\&spm=1001.2101.3001.7020 "Async")注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理, 在线程调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行。 所以,需要注意的一个错误用法是,如果a方法调用它同类中的标注@Async的b方法,是不会异步执行的,因为从a方法进入调用的都是该类对象本身,不会进入代理类。
因此,相同类中的方法调用带@Async的方法是无法异步的,这种情况仍然是同步。
DeferredResult实例
DeferredResult
和Callable
实现功能类型,都是异步返回,只不过Callable
不能直接设置超时时间,还需要和FutureTask
配合使用
@Controller
public class DeferedResultController {
private ConcurrentLinkedDeque<DeferredResult<String>> deferredResults =
new ConcurrentLinkedDeque<DeferredResult<String>>();
@RequestMapping("/getResult")
@ResponseBody
public DeferredResult<String> getDeferredResultController(){
//设置 5秒就会超时
final DeferredResult<String> stringDeferredResult = new DeferredResult<String>(1000);
//将请求加入到队列中
deferredResults.add(stringDeferredResult);
final String message = "{username:wangbinghua}";
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1010);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务处理
System.out.println("业务处理");
stringDeferredResult.setResult(message);
}
});
//setResult完毕之后,调用该方法
stringDeferredResult.onCompletion(new Runnable() {
@Override
public void run() {
System.out.println("异步调用完成");
//响应完毕之后,将请求从队列中去除掉
deferredResults.remove(stringDeferredResult);
}
});
stringDeferredResult.onTimeout(new Runnable() {
@Override
public void run() {
System.out.println("业务处理超时");
stringDeferredResult.setResult("error:timeOut");
}
});
return stringDeferredResult;
}
//开启线程定时扫描队列,响应客户端
@Scheduled(fixedRate = 1000)
public void scheduleResult(){
System.out.println(new Date());
for(int i = 0;i < deferredResults.size();i++){
DeferredResult<String> deferredResult = deferredResults.getFirst();
deferredResult.setResult("result:" + i);
}
}
}
说明
在Spring Mvc的控制层(springmvc层框架中)中,只要有一个用户请求便会实例化一个DeferedResult对象,然后返回该对象,然后就会释放TOMCAT的线程,从而完成客户端响应。只要DeferedResult对象不设置result响应的内容,则控制层(springMVC层)的容器主线程在响应客户端上就会发生阻塞。
因为SpringMVC只会实例化一个Controller对象,无论有多少个用户请求,在堆上只有一个Controller对象,因此可以添加一个成员变量List,将这些用户请求的DeferedResult对象存放到List中,然后启动一个定时线程扫描list,从而依次执行setResult方法,响应客户端。