Java8异步编程

2022-08-12 11:05:21

一、引言

参考Java多线程几种实现方式,我们可以很快速的通过new Thread(...).start()开启一个新的线程,但是这样创建线程会有很多坏处:

  • 每次都要新建一个对象,性能差;
  • 建出来的很多个对象是独立的,缺乏统一的管理。如果在代码中无限新建线程会导致这些线程相互竞争,占用过多的系统资源从而导致死机或者oom
  • 缺乏许多功能如定时执行、中断等。

因此Java给我们提供好一个十分好用的工具,那就是线程池线程池(ThreadPool)是一种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在一个"池子"内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从"池子"内取出相应的线程执行对应的任务即可。

二、Java Executors线程池

1、Java 线程池概述

Java提供了一个工厂类来构造我们需要的线程池,这个工厂类就是 Executors 。这里主要讲6个创建线程池的方法。默认拒绝策略都为AbortPolicy,即丢弃任务并抛出RejectedExecutionException异常

  • newCachedThreadPool()
  • newFixedThreadPool(int nThreads)
  • newScheduledThreadPool(int corePoolSize)
  • newSingleThreadExecutor()
  • newSingleThreadScheduledExecutor()
  • newWorkStealingPool()

2、newCachedThreadPool()

创建缓存线程池。缓存的意思就是这个线程池会根据需要创建新的线程,在有新任务的时候会优先使用先前创建出的线程。线程一旦创建了就一直在这个池子里面了,执行完任务后后续还有任务需要会重用这个线程,若是线程不够用了再去新建线程

publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<Runnable>());
  • corePoolSize = 0
  • maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制
  • keepAliveTime = 60s,线程空闲 60s 后自动结束
  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为 CachedThreadPool 线程创建无限制,不会有队列等待,所以使用 SynchronousQueue

适用场景:快速处理大量耗时较短的任务,如Netty 的 NIO 接受请求时,可使用CachedThreadPool

ExecutorService executorService=Executors.newCachedThreadPool();for(int i=0; i<10; i++){finalint index= i;// 每次发布任务前根据奇偶不同等待一段时间,如1s,这样就会创建两个线程if(i%2==0){try{Thread.sleep(1000);}catch(InterruptedException e){
            e.printStackTrace();}}// 执行任务
    executorService.execute(()->System.out.println(Thread.currentThread().getName()+":"+ index));}

但注意这里的线程池是无限大的,并没有规定他的大小

3、newFixedThreadPool(int nThreads)

创建定长线程池,参数是线程池的大小。也就是说,在同一时间执行的线程数量只能是 nThreads 这么多,这个线程池可以有效的控制最大并发数从而防止占用过多资源。超出的线程会放在线程池的一个无界队列里等待其他线程执行完。

publicstaticExecutorServicenewFixedThreadPool(int nThreads){returnnewThreadPoolExecutor(nThreads, nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());}
  • corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  • keepAliveTime = 0 该参数默认对核心线程无效,而 FixedThreadPool 全部为核心线程;
  • workQueue 为 LinkedBlockingQueue(无界阻塞队列),队列最大值为 Integer.MAX_VALUE。如果任务提交速度持续大于任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
  • FixedThreadPool 的任务执行是无序的;

适用场景:可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

4、newScheduledThreadPool(int corePoolSize)

ScheduledThreadPool创建一个可以执行延迟任务的线程池,同时这个线程池也是定长的,参数 corePoolSize 就是线程池的大小,即在空闲状态下要保留在池中的线程数量。而要实现调度需要使用这个线程池的 schedule() 方法,队列使用了DelayedWorkQueue延迟队列

ScheduledExecutorService scheduledExecutorService=Executors.newScheduledThreadPool(5);// 三秒后执行
scheduledExecutorService.schedule(()->System.out.println(Thread.currentThread().getName()+": 我会在3秒后执行。"),3,TimeUnit.SECONDS);

5、newSingleThreadExecutor()

创建单线程池,只使用一个线程来执行任务。但是它与newFixedThreadPool(1, threadFactory) 不同,它会保证创建的这个线程池不会被重新配置为使用其他的线程,也就是说这个线程池里的线程始终如一,同时可以保证先进先出的执行顺序

ExecutorService executorService=Executors.newSingleThreadExecutor();

6、newSingleThreadScheduledExecutor()

创建一个单线程的可以执行延迟任务的线程池

ScheduledExecutorService threadPool=Executors.newSingleThreadScheduledExecutor();System.out.println("添加任务,时间:"+newDate());
threadPool.schedule(()->{System.out.println("任务被执行,时间:"+newDate());try{TimeUnit.SECONDS.sleep(1);}catch(InterruptedException e){}},2,TimeUnit.SECONDS);

7、newWorkStealingPool()

创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用

ExecutorService threadPool=Executors.newWorkStealingPool();for(int i=0; i<10; i++){finalint index= i;
    threadPool.execute(()->{System.out.println(index+" 被执行,线程名:"+Thread.currentThread().getName());});}// 确保任务执行完成while(!threadPool.isTerminated()){}

8、线程池的关闭

线程池启动后需要手动关闭,否则会一直不结束

  • shutdown() : 将线程池状态置成SHUTDOWN,此时不再接受新的任务等待线程池中已有任务执行完成后结束
  • shutdownNow() : 将线程池状态置成SHUTDOWN,将线程池中所有线程中断(调用线程的interrupt() 操作),清空队列,并返回正在等待执行的任务列表

并且它还提供了查看线程池是否关闭和是否终止的方法,分别为isShutdown()isTerminated()

三、自定义线程池ThreadPoolExecutor

1、概述

阿里巴巴的JAVA开发手册推荐用ThreadPoolExecutor创建线程池,可以规避资源耗尽的风险。因为Executors线程池都不支持自定义拒绝策略。newFixedThreadPoolnewSingleThreadExecutor主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。newCachedThreadPoolnewScheduledThreadPool主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

2、自定义线程池参数介绍

publicThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  • corePoolSize:核心线程数,线程池中始终存活的线程数
  • maximumPoolSize:最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数
  • keepAliveTime:最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程
  • unit:keepAliveTime的时间单位,和keepAliveTime配合使用
  • workQueue:一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
    • SynchronousQueue(常用):直接提交队列。SynchronousQueue没有容量,所以实际上提交的任务不会被添加到任务队列,总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略
    • LinkedBlockingQueue(常用):无界的任务队列。当有新的任务来到时,若系统的线程数小于corePoolSize,线程池会创建新的线程执行任务;当系统的线程数量等于corePoolSize后,因为是无界的任务队列,总是能成功将任务添加到任务队列中,所以线程数量不再增加。若任务创建的速度远大于任务处理的速度,无界队列会快速增长,直到内存耗尽。
    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
  • threadFactory:线程工厂,用于创建线程,一般情况下使用默认的,即Executors类的静态方法defaultThreadFactory();
  • handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。系统提供四种,默认策略为AbortPolicy
    • DiscardOldestPolicy:丢弃任务队列中最早添加的任务,并尝试提交当前任务
    • CallerRunsPolicy:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度
    • DiscardPolicy:默默丢弃无法处理的任务,不予任何处理
    • AbortPolicy:直接抛出异常,阻止系统正常工作

线程池的执行流程

  • 当线程数小于核心线程数时,创建线程
  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列
  • 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务

3、自定义线程池demo

@Slf4jpublicclassThreadPoolService{publicstaticvoidmain(String[] args)throwsException{BlockingQueue<Runnable> workQueue=newArrayBlockingQueue<>(2);NameThreadFactory threadFactory=newNameThreadFactory();RejectedExecutionHandler handler=newMyIgnorePolicy();//默认线程工程 Executors.defaultThreadFactory()//默认拒绝策略 new ThreadPoolExecutor.AbortPolicy()ThreadPoolExecutor executor=newThreadPoolExecutor(2,4,2000,TimeUnit.MILLISECONDS,
                workQueue, threadFactory,  handler);// 预启动所有核心线程
        executor.prestartAllCoreThreads();for(int i=1; i<=10; i++){MyTask task=newMyTask(String.valueOf(i));
            executor.execute(task);}//阻塞主线程System.in.read();}/**
     * Description:自定义线程名字
    */staticclassNameThreadFactoryimplementsThreadFactory{privatefinalAtomicInteger mThreadNum=newAtomicInteger(1);@OverridepublicThreadnewThread(Runnable r){Thread t=newThread(r,"my-thread-"+ mThreadNum.getAndIncrement());System.out.println(t.getName()+" has been created");return t;}}staticclassMyIgnorePolicyimplementsRejectedExecutionHandler{@OverridepublicvoidrejectedExecution(Runnable r,ThreadPoolExecutor executor){doLog(r, executor);}privatevoiddoLog(Runnable r,ThreadPoolExecutor e){// 可做日志记录等System.err.println( r.toString()+" rejected");System.out.println("completedTaskCount: "+ e.getCompletedTaskCount());}}staticclassMyTaskimplementsRunnable{privateString name;publicMyTask(String name){this.name= name;}@Overridepublicvoidrun(){try{System.out.println(this.toString()+" is running!");// 让任务执行慢点Thread.sleep(3000);}catch(InterruptedException e){
                e.printStackTrace();}}publicStringgetName(){return name;}@OverridepublicStringtoString(){return"MyTask [name="+ name+"]";}}}

四、异步编程函数式接口

1、简介

函数式接口可以参考java8常用新特性,这里主要介绍几种函数式接口,Callable、Runnable、Future、CompletableFuture和FutureTask

2、Callable和Runnable异同

//两个接口的定义@FunctionalInterfacepublicinterfaceRunnable{publicabstractvoidrun();}@FunctionalInterfacepublicinterfaceCallable<V>{Vcall()throwsException;}

相同点

都是接口,都可以编写多线程程序,都可以通过线程池启动线程

不同点

Runnable没有返回值,Callable可以返回执行结果,是个泛型;Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

publicclassTest1{staticclassMinimplementsCallable<Integer>{@OverridepublicIntegercall()throwsException{Thread.sleep(1000);return4;}}staticclassMaximplementsRunnable{@SneakyThrows@Overridepublicvoidrun(){Thread.sleep(1000);}}publicstaticvoidmain(String[] args){ExecutorService executorService=Executors.newCachedThreadPool();
        executorService.submit(newMin());
        executorService.submit(newMax());}

3、Future类

3.1 future介绍

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果

//Since:1.5publicinterfaceFuture<V>{//取消任务的执行,参数指定是否立即中断任务执行,或者等等任务结束booleancancel(boolean mayInterruptIfRunning);//任务是否已经取消,任务正常完成前将其取消,则返回 truebooleanisCancelled();//任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回truebooleanisDone();//等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationExceptionVget()throwsInterruptedException,ExecutionException;//同上面的get功能一样,多了设置超时时间。超时会抛出TimeoutExceptionVget(long timeout,TimeUnit unit)throwsInterruptedException,ExecutionException,TimeoutException;}

一般情况下,我们会结合Callable和Future一起使用,通过ExecutorService的submit方法执行Callable,并返回Future。

publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException,TimeoutException{ExecutorService executor=Executors.newCachedThreadPool();//Lambda 是一个 callable, 提交后便立即执行,这里返回的是 FutureTask 实例Future<String> future= executor.submit(()->{System.out.println("running task");Thread.sleep(10000);return"return task";});
        future.get(2,TimeUnit.SECONDS);}

当然Future模式也有它的缺点,它没有提供通知的机制,我们无法得知Future什么时候完成。如果要在future.get()的地方等待future返回的结果,那只能通过isDone()轮询查询。

3.2 FutureTask

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/FutureTask.html

Future是一个接口,是无法生成一个实例的,所以又有了FutureTask。FutureTask实现了RunnableFuture接口,RunnableFuture接口又实现了Runnable接口和Future接口。所以FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。

4、CompletableFuture类

4.1 介绍

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

4.2 异步任务创建

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法

  • supplyAsync执行CompletableFuture任务,支持返回值
  • runAsync执行CompletableFuture任务,没有返回值
方法名描述
runAsync(Runnable runnable)使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
runAsync(Runnable runnable, Executor executor)使用指定的thread pool执行异步代码
supplyAsync(Supplier supplier)使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
supplyAsync(Supplier supplier, Executor executor)使用指定的thread pool执行异步代码,异步操作有返回值
publicstaticvoidmain(String[] args){//可以自定义线程池ExecutorService executor=Executors.newCachedThreadPool();//runAsync的使用CompletableFuture<Void> runFuture=CompletableFuture.runAsync(()->System.out.println("run,shawn"), executor);//supplyAsync的使用CompletableFuture<String> supplyFuture=CompletableFuture.supplyAsync(()->{System.out.print("supply,shawn");return"shawn";}, executor);//runAsync的future没有返回值,输出nullSystem.out.println(runFuture.join());//supplyAsync的future,有返回值System.out.println(supplyFuture.join());
    executor.shutdown();// 线程池需要关闭}

4.3 任务异步回调

在这里插入图片描述

  • thenRun/thenRunAsync

    做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

  • thenAccept/thenAcceptAsync

    第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

  • thenApp

  • 作者:魅Lemon
  • 原文链接:https://blog.csdn.net/lemon_TT/article/details/121561663
    更新时间:2022-08-12 11:05:21