CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回
1. 创建CompletableFuture直接new对象也成。一个completableFuture对象代表着一个任务。以下方案会先输出”等待“--”监控任务执行“-- ”完成“--"结束"
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture1 = new CompletableFuture<>();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("监控任务执行");
completableFuture1.complete("完成");
}
}).start();
System.out.println("====等待");
System.out.println(completableFuture1.get());
System.out.println("====结束");
}
2. 使用CompletableFuture实现异步调用,方法有:
带有supply是持有返回值的,run是void返回值的,
特别注意:
使用CompletableFuture.supplyAsync()实现异步时,创建的都是守护线程,如果不用get()方法进行阻塞主线程,且主线程全部执行完了,如果此时异步线程还没执行完则会直接中断。传统的new Thread()使用的是非守护线程,对于CompletableFuture我们可以使用线程池让他变成非守护线程。
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("this is lambda supplyAsync");
System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result1";
});
ExecutorService executorService = Executors.newCachedThreadPool();
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("this is task with executor");
System.out.println("supplyAsync 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("33333333333");
return "result2";
}, executorService);
System.out.println(completableFuture.get()); // 使用get方法阻塞线程
System.out.println(future.get());
3、CompletableFuture.Join()方法使用
private static void testJoin() {
System.out.println("测试CompletableFuture.join阻塞线程开始");
ExecutorService executorService = Executors.newCachedThreadPool();
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行CompletableFuture异步任务");
return "result1";
}, executorService);
// 如果不注释下面代码,则主线程会被阻塞,等待completableFuture异步任务执行完才会继续执行主线程未执行的动作
// 如果注释了下面的代码,即主线程不会被阻塞,又如果completableFuture是守护线程(即没有使用executorService)时,则主线程结束之后,守护线程里未执行的动作将不会再执行
completableFuture.join();
System.out.println("主线程执行结束");
executorService.shutdown();
}
注意:
如果使用completableFuture.join(),则主线程会被阻塞,等待completableFuture异步任务执行完才会继续执行主线程未执行的动作 如果不使用用completableFuture.join(),即主线程不会被阻塞,又如果completableFuture是守护线程(即没有使用executorService)时,则主线程结束之后,守护线程里未执行的动作将不会再执行
4、CompletableFuture.when()方法使用
private static void testWhen() throws Exception {
/*
* 1、在使用whenComplete,如果在whenComplete执行之前completableFuture异步任务执行完毕,那么whenComplete使用的是main的主线程
* 2、如果在whenComplete执行之前completableFuture异步任务未执行完毕,则主线程会先执行,whenComplete在completableFuture执行完之后再执行,
* 这时whenComplete用的则是completableFuture线程(前提是completableFuture是非守护线程或者主线程未结束)
*
*/
System.out.println("测试CompletableFuture.when*开始");
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=======completableFuture:" + Thread.currentThread());
System.out.println("执行CompletableFuture异步任务");
return "result1";
});
Thread.sleep(300);
// 同步执行
completableFuture.whenComplete(((s, throwable) -> { // s跟异常有一个会为空,即程序要么成功要么失败
System.out.println("=======whenComplete:" + Thread.currentThread());
System.out.println(s);
System.out.println(throwable);
}));
// 异步执行
// completableFuture.whenCompleteAsync(((s, throwable) -> { // s跟异常有一个会为空,即程序要么成功要么失败
// System.out.println(s);
// System.out.println(throwable);
// }));
System.out.println("主线程执行结束");
Thread.sleep(1000);
}
注意:
1、在使用whenComplete,如果在whenComplete执行之前completableFuture异步任务执行完毕,那么whenComplete使用的是main的主线程 * 2、如果在whenComplete执行之前completableFuture异步任务未执行完毕,则主线程会先执行,whenComplete在completableFuture执行完之后再执行,这时whenComplete用的则是completableFuture线程(前提是completableFuture是非守护线程或者主线程未结束)
5、CompletableFuture.thenxxx()方法使用
private static void testThen() throws Exception {
/*
* then与when的区别就是then使用的就是上一个任务的线程
* thenApply(Function);这样的就是有入参有返回值类型的。
* thenAccept(Consumer);这样的就是有入参,但是没有返回值的
* thenRun就是这个任务运行完,再运行下一个任务
* combine的理解就是结合两个任务的结果
* compose理解就是上一个任务结果是then的一部分
*/
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("执行CompletableFuture异步任务");
return "result1";
});
// compose理解就是上一个任务结果是then的一部分
CompletableFuture<String> future = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(()->{
System.out.println("========compose:" + s);
return "compose";
}));
// combine的理解就是结合两个任务的结果
CompletableFuture<String> combine = completableFuture.thenCombine(CompletableFuture.supplyAsync(()->{
return "thenCombine";
}), ((s, s2) -> {
return s + "===" + s2;
}));
System.out.println(combine.get());
}
注意:
* then与when的区别就是then使用的就是上一个任务的线程 * thenApply(Function);这样的就是有入参有返回值类型的。 * thenAccept(Consumer);这样的就是有入参,但是没有返回值的 * thenRun就是这个任务运行完,再运行下一个任务 * combine的理解就是结合两个任务的结果 * compose理解就是上一个任务结果是then的一部分