java并发编程信号量三组工具类Tools

2022-09-27 08:56:58

一、CountDownLatch

1.1 CountDownLatch简介

CountDownLatch是一个辅助同步器,初始化时设置一个计数,在构造CountDownLatch对象时传入,每调用一次countDown()方法,计数值就会减1。线程可以调用CountDownLatch的await方法进入阻塞,当计数值降到0时,所有之前调用await阻塞的线程都会释放。
注意:CountDownLatch的初始计数值一旦降到0,无法重置。如果需要重置,可以考虑使用CyclicBarrier。

1.2 CountDownLatch使用

ContDownLatch一般有以下几种用法:

1.2.1 作为一个开关/入口

将初始计数值为1的CountDownLatch作为一个开关后入口:
在调用countDown()的线程打开入口前,所有调用await的线程都一直在入口处等待。

publicclassCountDownLatchTest{privatestaticfinalint N=10;publicstaticvoidmain(String[] args)throws InterruptedException{
        CountDownLatch switcher=newCountDownLatch(1);for(int i=0; i< N;++i){newThread(newWorker(switcher),(i+1)+"号选手").start();}doSomething();
        switcher.countDown();// 主线程开启开关}publicstaticvoiddoSomething(){
        System.out.println("裁判发出信号枪");}}classWorkerimplementsRunnable{privatefinal CountDownLatch startSignal;Worker(CountDownLatch startSignal){this.startSignal= startSignal;}@Overridepublicvoidrun(){try{
            System.out.println(Thread.currentThread().getName()+"已准备");
            startSignal.await();//所有执行线程在此处等待开关开启doWork();}catch(InterruptedException ex){}}voiddoWork(){
        System.out.println(Thread.currentThread().getName()+"开跑啦");}}

在这里插入图片描述
在裁判发出信号枪之前是没人可以开跑的。等到主线程调用了switcher.countDown(); switcher的count为0,所有等待的线程都被唤醒。后面的线程也不会进入等待。

1.2.2 作为一个完成信号

将初始计数值为N的 CountDownLatch作为一个完成信号点:使某个线程在其它N个线程完成某项操作之前一直等待。

publicclassCountdownlatch2{privatestaticfinalint N=10;publicstaticvoidmain(String[] args)throws InterruptedException{
            CountDownLatch compsignal=newCountDownLatch(N);for(int i=0; i< N;++i){newThread(newWorker(compsignal),(i+1)+"号运动员").start();}

            compsignal.await();// 主线程等待其它N个线程完成doSomething();}publicstaticvoiddoSomething(){
            System.out.println("裁判已经等到所有运动员就位,裁判发出开跑信号");}}classWorkerimplementsRunnable{privatefinal CountDownLatch compSignal;Worker(CountDownLatch compSignal){this.compSignal= compSignal;}@Overridepublicvoidrun(){doWork();
            compSignal.countDown();//每个线程做完自己的事情后,就将计数器减去1}voiddoWork(){
            System.out.println(Thread.currentThread().getName()+"已就位");}}

在这里插入图片描述
上面程序模拟的是当所有运动员都到场后,裁判才发信号。

1.3 CountDownLatch原理

CountDownLatch原理详见下面博客的 三、AQS实现原理之AQS共享功能

java并发编程——从源码手把手带你理解AQS源码

二、CyclicBarrier

2.1 CyclicBarrier简介

CyclicBarrier是一个辅助同步器类,在JDK1.5时随着J.U.C一起引入。这个类的功能和我们之前介绍的CountDownLatch有些类似。我们知道,CountDownLatch是一个倒数计数器,在计数器不为0时,所有调用await的线程都会等待,当计数器降为0,线程才会继续执行,且计数器一旦变为0,就不能再重置了。
CyclicBarrier可以认为是一个栅栏,栅栏的作用是什么?就是阻挡前行。
顾名思义,CyclicBarrier是一个可以循环使用的栅栏,它做的事情就是:
让线程到达栅栏时被阻塞(调用await方法),直到到达栅栏的线程数满足指定数量要求时,栅栏才会打开放行。

CyclicBarrier有两个构造器

publicCyclicBarrier(int parties){this(parties, null);}
publicCyclicBarrier(int parties, Runnable barrierAction){if(parties<=0)thrownewIllegalArgumentException();this.parties= parties;this.count= parties;this.barrierCommand= barrierAction;}

Runnable任务其实就是当最后一个线程到达栅栏时,后续立即要执行的任务。

2.2 CyclicBarrier使用

场景:打麻将,在4个牌鬼没有全到齐时,牌局是无法开始的,等到四个牌鬼都到齐了,牌局才开始。

publicclassCyclicBarrierTest{publicstaticvoidmain(String[] args){
        CyclicBarrier cyclicBarrier=newCyclicBarrier(4);
        CyclicBarrier cyclicBarrier2=newCyclicBarrier(4,()-> System.out.println("开始打麻将"));for(int i=0; i<4; i++){newThread(newGambler(cyclicBarrier2),(i+1)+"号赌鬼").start();}}}classGamblerimplementsRunnable{private CyclicBarrier c;publicGambler(CyclicBarrier c){this.c= c;}voidplayMahjong(){
        System.out.println(Thread.currentThread().getName()+"已就位");}@Overridepublicvoidrun(){playMahjong();try{
            c.await();}catch(Exception e){
            e.printStackTrace();}}}

在这里插入图片描述
从输出可以看到,线程到达栅栏时会被阻塞(调用await方法),直到到达栅栏的线程数满足指定数量要求时,栅栏才会打开放行。

2.3 CyclicBarrier原理

CyclicBarrier 并没有自己去实现AQS框架的API,而是利用了ReentrantLock和Condition。

publicintawait()throws InterruptedException, BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutException toe){thrownewError(toe);// cannot happen}}
privateintdowait(boolean timed,long nanos)throws InterruptedException, BrokenBarrierException,
               TimeoutException{final ReentrantLock lock=this.lock;
        lock.lock();try{final Generation g= generation;if(g.broken)thrownewBrokenBarrierException();if(Thread.interrupted()){breakBarrier();thrownewInterruptedException();}int index=--count;if(index==0){// trippedboolean ranAction=false;try{final Runnable command= barrierCommand;if(command!= null)
                        command.run();
                    ranAction=true;nextGeneration();return0;}finally{if(!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor(;;){try{if(!timed)
                        trip.await();elseif(nanos>0L)
                        nanos= trip.awaitNanos(nanos);}catch(InterruptedException ie){if(g== generation&&! g.broken){breakBarrier();throw ie;}else{// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.
                        Thread.currentThread().interrupt();}}if(g.broken)thrownewBrokenBarrierException();if(g!= generation)return index;if(timed&& nanos<=0L){breakBarrier();thrownewTimeoutException();}}}finally{
            lock.unlock();}}

dowait方法并不复杂,一共有3部分:
1、判断栅栏是否已经损坏或当前线程已经被中断,如果是会分别抛出异常;
2、如果当前线程是最后一个到达的线程,会尝试执行最终任务(如果构造CyclicBarrier对象时有传入Runnable的话),执行成功即返回,失败会破坏栅栏;
3、对于不是最后一个到达的线程,会在Condition队列上等待,为了防止被意外唤醒,这里用了一个自旋操作。

三、Semaphore

3.1 Semaphore概述

Semaphore,又名信号量,这个类的作用有点类似于“许可证”。有时,我们因为一些原因需要控制同时访问共享资源的最大线程数量,比如出于系统性能的考虑需要限流,或者共享资源是稀缺资源,我们需要有一种办法能够协调各个线程,以保证合理的使用公共资源。
Semaphore维护了一个许可集,其实就是一定数量的“许可证”。当有线程想要访问共享资源时,需要先获取(acquire)的许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还(release)许可,以供其它需要的线程使用。另外,Semaphore支持公平/非公平策略,这和ReentrantLock类似,后面讲Semaphore原理时会看到,它们的实现本身就是类似的。

3.2 Semaphore使用

publicclassSemaphoreTest{static Semaphore semaphore=newSemaphore(5);publicstaticvoidmain(String[] args){for(int i=0; i<10; i++){newThread(()->{try{long timeMillis= System.currentTimeMillis();
                    semaphore.acquire(1);
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName()+"等待了"+(System.currentTimeMillis()-timeMillis)/1000+"s");
                    semaphore.release(1);}catch(InterruptedException e){
                    e.printStackTrace();}},(i+1)+"号玩家").start();}}}

在这里插入图片描述
由打印结果可以看出,一次只能有5个线程进入semaphore.acquire(1)语句后面的代码。

3.3 Semaphore实现原理

Semaphore是通过内部类实现了AQS框架提供的接口,而且基本结构几乎和ReentrantLock完全一样,通过内部类分别实现了公平/非公平策略。
对于Semaphore来说,资源就是许可证的数量:
剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) ≥ 0:资源可用
剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) < 0:资源不可用

这里共享的含义是多个线程可以同时获取资源,当计算出的剩余资源不足时,线程就会阻塞。

注意:Semaphore不是锁,只能限制同时访问资源的线程数,至于对数据一致性的控制,Semaphore是不关心的。当前,如果是只有一个许可的Semaphore,可以当作锁使用。
上面这句话可以分两点理解:
1、当满足下面条件时:
(1)当Semaphore的许可只有一个,例如:Semaphore semaphore = new Semaphore(1);
(2)所有线程调用acquire方法时,需要的许可也为1,释放的许可也为1时。即semaphore .acquire(1) 、semaphore .release(1).
Semaphore跟ReentrantLock一样,都可以实现锁的功能。
2、Semaphore不是锁,并不能保证线程安全。
当满足下面条件时:
(1)Semaphore semaphore = new Semaphore(10);
(2)semaphore .acquire(1) 、semaphore .release(1);
也就是说,在某一时刻,同时最多允许10个线程去同时访问 acquire与release之间的代码块,这段代码块是线程不安全的。

四、常见问题

4.1 CyclicBarrier和CountDownLatch比较。

CountDownLatch是计数器,只能使用一次。而CyclicBarrier的计数器提供reset功能,可以多次使用。
CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;
CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

  • 作者:mkfka
  • 原文链接:https://blog.csdn.net/mkfka/article/details/108871046
    更新时间:2022-09-27 08:56:58