1. 概念与区别
1.1. 概念
CyclicBarrier:循环屏障,是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作。假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待
CountDownLatch:闭锁,可以使一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。常用于监听某些初始化操作,等待初始化执行完毕后,通知主线程继续工作
Semaphore:(信号量)为多线程协作提供了更为强大的控制方法,因为synchronized和重入锁ReentrantLock,这2种锁一次都只能允许一个线程访问一个资源,而信号量可以控制有多少个线程可以访问特定的资源。
1.2. 区别
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同;CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;Semaphore其实和锁有点类似,它一般用于控制对某组资源【如秒杀的商品或一组机器资源】的访问权限。
使用CountDownLatch时,线程在countDown()之后,会继续执行自己的任务;而使用CyclicBarrier时,await之后的代码【屏障之后的代码】是在所有线程任务结束之后,执行barrierAction后才会进行继续执行
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断
CyclicBarrier内部有一个线程把规则破坏了(如接收到中断信号或超时),此线程触发异常,然后被唤醒向下运行,其他等待中或者后面到达的线程,会在await()方法上触发`BrokenBarrierException异常,然后继续执行
区别点 | CountDownLatch | CyclicBarrier |
---|---|---|
触发条件 | 计算为0时释放所有等待的线程 | 所有线程都到达屏障时释放所有等待线程 |
是否可重置 | 计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
何时计数 | 调用countDown()方法计数减一,代码继续执行,调用await()方法只进行阻塞,对计数没任何影响。 | 调用await()方法时,获取锁,然后在锁内将计数减一 |
是否可重复利用 | 不可重复利用 | 可重复利用 |
2. 代码示例
2.1. CountDownLatch代码示例
示例是一个简单的并行处理工具类,可以传入n个任务内部使用线程池进行处理,等待所有任务都处理完成之后,方法才会返回。
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class TaskDisposeUtils {
//并行线程数
public static final int POOL_SIZE;
static {
POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
}
/**
* 并行处理,并等待结束
*
* @param taskList 任务列表
* @param consumer 消费者
* @param <T>
* @throws InterruptedException
*/
public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {
dispose(POOL_SIZE, taskList, consumer);
}
/**
* 并行处理,并等待结束
*
* @param poolSize 线程池大小
* @param taskList 任务列表
* @param consumer 消费者
* @param <T>
* @throws InterruptedException
*/
public static <T> void dispose(int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
if (taskList.size() == 0) {
return;
}
poolSize = Math.min(poolSize, taskList.size());
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(poolSize);
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (T item : taskList) {
executorService.execute(() -> {
try {
consumer.accept(item);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}
public static void main(String[] args) throws InterruptedException {
List<Integer> list = Stream.iterate(1, a -> a + 1).limit(5).collect(Collectors.toList());
TaskDisposeUtils.dispose(list, item -> {
System.out.println(String.format("%s任务%s执行完毕", System.currentTimeMillis(), item));
});
System.out.println(list + "中的任务都处理完毕!");
}
}
2.2. semaphore代码示例
在使用semaphore时,一定要正确的释放锁。
不能在finally中直接释放锁,因为如果获取锁的过程中发生异常,导致获取锁失败,最后finally里面也释放了许可,会导致许可数量凭空增长了。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2);
public static class WorkThread extends Thread {
public WorkThread(String name) {
super(name);
}
@Override
public void run() {
Thread thread = Thread.currentThread();
//获取许可是否成功
boolean acquireSuccess = false;
try {
semaphore.acquire();
acquireSuccess = true;
System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",获取许可,当前可用许可数量:" + semaphore.availablePermits());
//休眠100秒
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",运行结束!");
} catch (InterruptedException e) {
} finally {
if (acquireSuccess) {
semaphore.release();
}
}
System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",当前可用许可数量:" + semaphore.availablePermits());
}
}
public static void main(String[] args) throws InterruptedException {
WorkThread wt1 = new WorkThread("wt1");
wt1.start();
//休眠1秒
TimeUnit.SECONDS.sleep(1);
WorkThread wt2 = new WorkThread("wt2");
wt1.start();
//休眠1秒
TimeUnit.SECONDS.sleep(1);
WorkThread wt3 = new WorkThread("wt3");
wt3.start();
//给t2和t3发送中断信号
wt2.interrupt();
wt3.interrupt();
}
}
2.3. CyclicBarrier使用示例
CyclicBarrier在触发后会重置,并开启一次屏障
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierTest {
public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()-> System.out.println("全部人员到齐了---------------"));
public static class Person extends Thread {
int sleep;
public Person(String name, int sleep) {
super(name);
this.sleep = sleep;
}
//等待吃饭
void eat() {
try {
//模拟休眠
TimeUnit.SECONDS.sleep(sleep);
long starTime = System.currentTimeMillis();
//调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
cyclicBarrier.await();
long endTime = System.currentTimeMillis();
System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
//休眠sleep时间,模拟当前员工吃饭耗时
TimeUnit.SECONDS.sleep(sleep);
} catch (Exception e) {
}
}
//等待所有人到齐之后,开车去下一站
void drive() {
try {
long starTime = System.currentTimeMillis();
//调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
cyclicBarrier.await();
long endTime = System.currentTimeMillis();
System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),去下一景点的路上!");
} catch (Exception e) {
}
}
@Override
public void run() {
//等待所有人到齐之后吃饭,先到的人坐那等着,什么事情不要干
this.eat();
//等待所有人到齐之后开车去下一景点,先到的人坐那等着,什么事情不要干
this.drive();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 10; i++) {
new Person("员工" + i, i).start();
}
}
}