并发编程--CountDownLatch、Semaphore、CyclicBarrier的区别及示例

2022-06-15 10:09:52

1. 概念与区别

1.1. 概念

  • CyclicBarrier循环屏障,是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作。假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待

  • CountDownLatch闭锁,可以使一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。常用于监听某些初始化操作,等待初始化执行完毕后,通知主线程继续工作

  • Semaphore:信号量)为多线程协作提供了更为强大的控制方法,因为synchronized和重入锁ReentrantLock,这2种锁一次都只能允许一个线程访问一个资源,而信号量可以控制有多少个线程可以访问特定的资源。

1.2. 区别

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同;CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行Semaphore其实和锁有点类似,它一般用于控制对某组资源【如秒杀的商品或一组机器资源】的访问权限

  • 使用CountDownLatch时,线程在countDown()之后,会继续执行自己的任务;而使用CyclicBarrier时,await之后的代码【屏障之后的代码】是在所有线程任务结束之后,执行barrierAction后才会进行继续执行

  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断

  • CyclicBarrier内部有一个线程把规则破坏了(如接收到中断信号或超时),此线程触发异常,然后被唤醒向下运行,其他等待中或者后面到达的线程,会在await()方法上触发`BrokenBarrierException异常,然后继续执行

区别点CountDownLatchCyclicBarrier
触发条件计算为0时释放所有等待的线程所有线程都到达屏障时释放所有等待线程
是否可重置计数为0时,无法重置计数达到指定值时,计数置为0重新开始
何时计数调用countDown()方法计数减一,代码继续执行,调用await()方法只进行阻塞,对计数没任何影响。调用await()方法时,获取锁,然后在锁内将计数减一
是否可重复利用不可重复利用可重复利用

2. 代码示例

2.1. CountDownLatch代码示例

示例是一个简单的并行处理工具类,可以传入n个任务内部使用线程池进行处理,等待所有任务都处理完成之后,方法才会返回。

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TaskDisposeUtils {
    //并行线程数
    public static final int POOL_SIZE;

    static {
        POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
    }

    /**
     * 并行处理,并等待结束
     *
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T>
     * @throws InterruptedException
     */
    public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        dispose(POOL_SIZE, taskList, consumer);
    }

    /**
     * 并行处理,并等待结束
     *
     * @param poolSize 线程池大小
     * @param taskList 任务列表
     * @param consumer 消费者
     * @param <T>
     * @throws InterruptedException
     */
    public static <T> void dispose(int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        if (taskList.size() == 0) {
            return;
        }
        poolSize = Math.min(poolSize, taskList.size());
        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(poolSize);
            CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
            for (T item : taskList) {
                executorService.execute(() -> {
                    try {
                        consumer.accept(item);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await();
        } finally {
            if (executorService != null) {
                executorService.shutdown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = Stream.iterate(1, a -> a + 1).limit(5).collect(Collectors.toList());
        TaskDisposeUtils.dispose(list, item -> {
            System.out.println(String.format("%s任务%s执行完毕", System.currentTimeMillis(), item));
        });
        System.out.println(list + "中的任务都处理完毕!");
    }
}

2.2. semaphore代码示例

在使用semaphore时,一定要正确的释放锁。

       不能在finally中直接释放锁,因为如果获取锁的过程中发生异常,导致获取锁失败,最后finally里面也释放了许可,会导致许可数量凭空增长了。

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    static Semaphore semaphore = new Semaphore(2);

    public static class WorkThread extends Thread {
        public WorkThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            //获取许可是否成功
            boolean acquireSuccess = false;
            try {
                semaphore.acquire();
                acquireSuccess = true;
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",获取许可,当前可用许可数量:" + semaphore.availablePermits());
                //休眠100秒
                TimeUnit.SECONDS.sleep(5);
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",运行结束!");
            } catch (InterruptedException e) {
            } finally {
                if (acquireSuccess) {
                    semaphore.release();
                }
            }
            System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",当前可用许可数量:" + semaphore.availablePermits());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WorkThread wt1 = new WorkThread("wt1");
        wt1.start();
        //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        WorkThread wt2 = new WorkThread("wt2");
        wt1.start();
        //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        WorkThread wt3 = new WorkThread("wt3");
        wt3.start();

        //给t2和t3发送中断信号
        wt2.interrupt();
        wt3.interrupt();
    }
}

2.3. CyclicBarrier使用示例

CyclicBarrier在触发后会重置,并开启一次屏障

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierTest {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()-> System.out.println("全部人员到齐了---------------"));

    public static class Person extends Thread {
        int sleep;

        public Person(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        //等待吃饭
        void eat() {
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");

                //休眠sleep时间,模拟当前员工吃饭耗时
                TimeUnit.SECONDS.sleep(sleep);
            } catch (Exception e) {
            }
        }

        //等待所有人到齐之后,开车去下一站
        void drive() {
            try {
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),去下一景点的路上!");
            } catch (Exception e) {
            }
        }

        @Override
        public void run() {
            //等待所有人到齐之后吃饭,先到的人坐那等着,什么事情不要干
            this.eat();
            //等待所有人到齐之后开车去下一景点,先到的人坐那等着,什么事情不要干
            this.drive();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            new Person("员工" + i, i).start();
        }
    }
}
  • 作者:java编程艺术
  • 原文链接:https://blog.csdn.net/penriver/article/details/117815106
    更新时间:2022-06-15 10:09:52