java8之CompletableFuture异步任务

2022-08-11 13:47:43

CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回

1. 创建CompletableFuture直接new对象也成。一个completableFuture对象代表着一个任务。以下方案会先输出”等待“--”监控任务执行“-- ”完成“--"结束"

public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture1 = new CompletableFuture<>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("监控任务执行");
                completableFuture1.complete("完成");
            }
        }).start();
        System.out.println("====等待");
        System.out.println(completableFuture1.get());
        System.out.println("====结束");
    }

2. 使用CompletableFuture实现异步调用,方法有:

 带有supply是持有返回值的,run是void返回值的,

特别注意:

使用CompletableFuture.supplyAsync()实现异步时,创建的都是守护线程,如果不用get()方法进行阻塞主线程,且主线程全部执行完了,如果此时异步线程还没执行完则会直接中断。传统的new Thread()使用的是非守护线程,对于CompletableFuture我们可以使用线程池让他变成非守护线程。
        final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("this is lambda supplyAsync");
            System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("this lambda is executed by forkJoinPool");
            return "result1";
        });

        ExecutorService executorService = Executors.newCachedThreadPool();
        final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("this is task with executor");
            System.out.println("supplyAsync 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("33333333333");
            return "result2";
        }, executorService);
        System.out.println(completableFuture.get()); // 使用get方法阻塞线程
        System.out.println(future.get());

3、CompletableFuture.Join()方法使用

private static void testJoin() {
        System.out.println("测试CompletableFuture.join阻塞线程开始");
        ExecutorService executorService = Executors.newCachedThreadPool();
        final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行CompletableFuture异步任务");
            return "result1";
        }, executorService);
        // 如果不注释下面代码,则主线程会被阻塞,等待completableFuture异步任务执行完才会继续执行主线程未执行的动作
        // 如果注释了下面的代码,即主线程不会被阻塞,又如果completableFuture是守护线程(即没有使用executorService)时,则主线程结束之后,守护线程里未执行的动作将不会再执行
        completableFuture.join();
        System.out.println("主线程执行结束");
        executorService.shutdown();
    }

注意:

如果使用completableFuture.join(),则主线程会被阻塞,等待completableFuture异步任务执行完才会继续执行主线程未执行的动作
如果不使用用completableFuture.join(),即主线程不会被阻塞,又如果completableFuture是守护线程(即没有使用executorService)时,则主线程结束之后,守护线程里未执行的动作将不会再执行

4、CompletableFuture.when()方法使用

private static void testWhen() throws Exception {
        /*
        * 1、在使用whenComplete,如果在whenComplete执行之前completableFuture异步任务执行完毕,那么whenComplete使用的是main的主线程
        * 2、如果在whenComplete执行之前completableFuture异步任务未执行完毕,则主线程会先执行,whenComplete在completableFuture执行完之后再执行,
        *    这时whenComplete用的则是completableFuture线程(前提是completableFuture是非守护线程或者主线程未结束)
        *
        */
        System.out.println("测试CompletableFuture.when*开始");
        final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("=======completableFuture:" + Thread.currentThread());
            System.out.println("执行CompletableFuture异步任务");
            return "result1";
        });
        Thread.sleep(300);


        // 同步执行
        completableFuture.whenComplete(((s, throwable) -> { // s跟异常有一个会为空,即程序要么成功要么失败
            System.out.println("=======whenComplete:" + Thread.currentThread());
            System.out.println(s);
            System.out.println(throwable);
        }));
        // 异步执行
        // completableFuture.whenCompleteAsync(((s, throwable) -> { // s跟异常有一个会为空,即程序要么成功要么失败
        //     System.out.println(s);
        //     System.out.println(throwable);
        // }));
        System.out.println("主线程执行结束");
        Thread.sleep(1000);
    }

注意:

 1、在使用whenComplete,如果在whenComplete执行之前completableFuture异步任务执行完毕,那么whenComplete使用的是main的主线程
* 2、如果在whenComplete执行之前completableFuture异步任务未执行完毕,则主线程会先执行,whenComplete在completableFuture执行完之后再执行,这时whenComplete用的则是completableFuture线程(前提是completableFuture是非守护线程或者主线程未结束)

5、CompletableFuture.thenxxx()方法使用

private static void testThen() throws Exception {
        /*
        * then与when的区别就是then使用的就是上一个任务的线程
        * thenApply(Function);这样的就是有入参有返回值类型的。
        * thenAccept(Consumer);这样的就是有入参,但是没有返回值的
        * thenRun就是这个任务运行完,再运行下一个任务
        * combine的理解就是结合两个任务的结果
        * compose理解就是上一个任务结果是then的一部分
        */
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行CompletableFuture异步任务");
            return "result1";
        });
        // compose理解就是上一个任务结果是then的一部分
        CompletableFuture<String> future = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(()->{
            System.out.println("========compose:" + s);
            return "compose";
        }));
        // combine的理解就是结合两个任务的结果
        CompletableFuture<String> combine = completableFuture.thenCombine(CompletableFuture.supplyAsync(()->{
            return "thenCombine";
        }), ((s, s2) -> {
            return s + "===" + s2;
        }));
        System.out.println(combine.get());
    }

注意:

* then与when的区别就是then使用的就是上一个任务的线程
* thenApply(Function);这样的就是有入参有返回值类型的。
* thenAccept(Consumer);这样的就是有入参,但是没有返回值的
* thenRun就是这个任务运行完,再运行下一个任务
* combine的理解就是结合两个任务的结果
* compose理解就是上一个任务结果是then的一部分
  • 作者:东京的雪铺满巴黎的道
  • 原文链接:https://blog.csdn.net/xiaoxiaoxiao_Ming/article/details/122349314
    更新时间:2022-08-11 13:47:43