1. 概念与区别
1.1. 概念
- CyclicBarrier:循环屏障,是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作。假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待 
- CountDownLatch:闭锁,可以使一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。常用于监听某些初始化操作,等待初始化执行完毕后,通知主线程继续工作 
- Semaphore:(信号量)为多线程协作提供了更为强大的控制方法,因为synchronized和重入锁ReentrantLock,这2种锁一次都只能允许一个线程访问一个资源,而信号量可以控制有多少个线程可以访问特定的资源。 
1.2. 区别
- CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同;CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;Semaphore其实和锁有点类似,它一般用于控制对某组资源【如秒杀的商品或一组机器资源】的访问权限。 
- 使用CountDownLatch时,线程在countDown()之后,会继续执行自己的任务;而使用CyclicBarrier时,await之后的代码【屏障之后的代码】是在所有线程任务结束之后,执行barrierAction后才会进行继续执行 
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断 
- CyclicBarrier内部有一个线程把规则破坏了(如接收到中断信号或超时),此线程触发异常,然后被唤醒向下运行,其他等待中或者后面到达的线程,会在await()方法上触发`BrokenBarrierException异常,然后继续执行 
| 区别点 | CountDownLatch | CyclicBarrier | 
|---|---|---|
| 触发条件 | 计算为0时释放所有等待的线程 | 所有线程都到达屏障时释放所有等待线程 | 
| 是否可重置 | 计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 | 
| 何时计数 | 调用countDown()方法计数减一,代码继续执行,调用await()方法只进行阻塞,对计数没任何影响。 | 调用await()方法时,获取锁,然后在锁内将计数减一 | 
| 是否可重复利用 | 不可重复利用 | 可重复利用 | 
2. 代码示例
2.1. CountDownLatch代码示例
示例是一个简单的并行处理工具类,可以传入n个任务内部使用线程池进行处理,等待所有任务都处理完成之后,方法才会返回。
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class TaskDisposeUtils {
    //并行线程数
    public static final int POOL_SIZE;
    static {
        POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
    }
    /**
     * 并行处理,并等待结束
     *
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T>
     * @throws InterruptedException
     */
    public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        dispose(POOL_SIZE, taskList, consumer);
    }
    /**
     * 并行处理,并等待结束
     *
     * @param poolSize 线程池大小
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T>
     * @throws InterruptedException
     */
    public static <T> void dispose(int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        if (taskList.size() == 0) {
            return;
        }
        poolSize = Math.min(poolSize, taskList.size());
        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(poolSize);
            CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
            for (T item : taskList) {
                executorService.execute(() -> {
                    try {
                        consumer.accept(item);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
        } finally {
            if (executorService != null) {
                executorService.shutdown();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = Stream.iterate(1, a -> a + 1).limit(5).collect(Collectors.toList());
        TaskDisposeUtils.dispose(list, item -> {
            System.out.println(String.format("%s任务%s执行完毕", System.currentTimeMillis(), item));
        });
        System.out.println(list + "中的任务都处理完毕!");
    }
}2.2. semaphore代码示例
在使用semaphore时,一定要正确的释放锁。
不能在finally中直接释放锁,因为如果获取锁的过程中发生异常,导致获取锁失败,最后finally里面也释放了许可,会导致许可数量凭空增长了。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
    static Semaphore semaphore = new Semaphore(2);
    public static class WorkThread extends Thread {
        public WorkThread(String name) {
            super(name);
        }
        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            //获取许可是否成功
            boolean acquireSuccess = false;
            try {
                semaphore.acquire();
                acquireSuccess = true;
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",获取许可,当前可用许可数量:" + semaphore.availablePermits());
                //休眠100秒
                TimeUnit.SECONDS.sleep(5);
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",运行结束!");
            } catch (InterruptedException e) {
            } finally {
                if (acquireSuccess) {
                    semaphore.release();
                }
            }
            System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",当前可用许可数量:" + semaphore.availablePermits());
        }
    }
    public static void main(String[] args) throws InterruptedException {
        WorkThread wt1 = new WorkThread("wt1");
        wt1.start();
        //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        WorkThread wt2 = new WorkThread("wt2");
        wt1.start();
        //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        WorkThread wt3 = new WorkThread("wt3");
        wt3.start();
        //给t2和t3发送中断信号
        wt2.interrupt();
        wt3.interrupt();
    }
}2.3. CyclicBarrier使用示例
CyclicBarrier在触发后会重置,并开启一次屏障
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierTest {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()-> System.out.println("全部人员到齐了---------------"));
    public static class Person extends Thread {
        int sleep;
        public Person(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }
        //等待吃饭
        void eat() {
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
                //休眠sleep时间,模拟当前员工吃饭耗时
                TimeUnit.SECONDS.sleep(sleep);
            } catch (Exception e) {
            }
        }
        //等待所有人到齐之后,开车去下一站
        void drive() {
            try {
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),去下一景点的路上!");
            } catch (Exception e) {
            }
        }
        @Override
        public void run() {
            //等待所有人到齐之后吃饭,先到的人坐那等着,什么事情不要干
            this.eat();
            //等待所有人到齐之后开车去下一景点,先到的人坐那等着,什么事情不要干
            this.drive();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            new Person("员工" + i, i).start();
        }
    }
}