7 并发工具类
除了之前讲到的并发容器,JDK 还提供了一些方便易用的并发工具类。
7.1 Semaphore
Semaphore 主要用于控制并发量。
7.1.1 核心方法
Semaphore 的核心方法就两个:
- acquire():取走一个 permit,成功拿到了 permit 才能继续干活,否则就阻塞直到拿到为止;
- release():放入一个 permit,通常干完活之后把之前 acquire 的 permit 再放回去。
7.1.2 demo
package per.lvjc.concurrent.util;import java.util.concurrent.Semaphore;publicclassSemaphoreDemo{//小区门口一共 2 辆共享单车privatestatic Semaphore bicycles=newSemaphore(2);publicstaticvoidmain(String[] args)throws InterruptedException{
Runnable runnable=()->{
String name= Thread.currentThread().getName();
System.out.println(name+": 社畜的一天开始了");timeFlows();boolean findBicycle= bicycles.tryAcquire();if(findBicycle){
System.out.println(name+": 竟然还有一辆共享单车,骑车上班");timeFlows();
System.out.println(name+": 下班还车");
bicycles.release();}else{
System.out.println(name+": 没车了,走路上班吧");}
System.out.println(name+": 社畜的一天结束了");};for(int i=1; i<=6; i++){
System.out.println("------ 星期 "+ i+" ------");newThread(runnable,"Jack").start();newThread(runnable,"Rose").start();newThread(runnable,"Nobody").start();
Thread.sleep(500);}}privatestaticvoidtimeFlows(){try{
Thread.sleep(100);}catch(InterruptedException e){
e.printStackTrace();}}}
输出结果:
------ 星期 1 ------
Jack: 社畜的一天开始了
Rose: 社畜的一天开始了
Nobody: 社畜的一天开始了
Jack: 竟然还有一辆共享单车,骑车上班
Rose: 竟然还有一辆共享单车,骑车上班
Nobody: 没车了,走路上班吧
Nobody: 社畜的一天结束了
Jack: 下班还车
Rose: 下班还车
Jack: 社畜的一天结束了
Rose: 社畜的一天结束了
------ 星期 2 ------
Jack: 社畜的一天开始了
Rose: 社畜的一天开始了
Nobody: 社畜的一天开始了
Jack: 竟然还有一辆共享单车,骑车上班
Rose: 竟然还有一辆共享单车,骑车上班
Nobody: 没车了,走路上班吧
Nobody: 社畜的一天结束了
Jack: 下班还车
Jack: 社畜的一天结束了
Rose: 下班还车
Rose: 社畜的一天结束了
------ 星期 3 ------
(省略......)
这里,通过 tryAcquire() 方法尝试骑走一辆共享单车,不阻塞,有就返回 true,否则返回 false;
因为 Semaphore 设置了只有 2,所以只有 2 个人可以骑车,剩下一个人得走路;
因为骑走车的人又还回来了,所以第二天又有 2 辆车可以骑。
7.1.3 注意点
- 构造方法设的参数仅仅是初始值;
- Semaphore 跟 Lock 不一样,Lock 在释放锁之前必须获得锁,Semaphore 可以不 acquire 直接 release;
- fair 与 unfair 这点与 Lock 是一样的;
- Semaphore 有个 acquireUninterruptibly 方法,acquire 时如果获取不到则阻塞并且不可被打断,但别真的没事就去 interrupt 一下,线程会从阻塞中被唤醒,然后发现还是 acquire 不到,又重新阻塞,损伤效率的。
7.1.4 手写
相比前面已经讲过的 AQS 和 ConcurrentHashMap 来说,Semaphore 算是非常简单的了,所以我们来自己实现一个 Semaphore。
先明确核心功能:
- acquire:
- 获取一个(或多个)permit——需要一个变量来存储当前 permit 数量,并且要保证并发可见性;
- 取走 permit 之后,修改剩余 permit 数量——并发修改要保证原子性;
- 如果 permit 数量不足,当前线程要阻塞——等待和通知;
- release:
- 放入一个(或多个)permit;
- 修改 permit 数量;
- 如果有线程在等待,将其唤醒。
实现方案:
- 保证可见性:
- synchronized;
- volatile;
- 保证原子性:
- synchronized;
- cas;
- 线程的等待和通知:
- wait / notify;
- await / signal;
- park / unpark(不同于上面两个,不会释放锁)。
所以实现方案有多种,这里选用 volatile + cas + park / unpark:
package per.lvjc.concurrent.util;import sun.misc.Unsafe;import java.lang.reflect.Field;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.locks.LockSupport;/**
* @author lvjc
* @date 2020/9/7
*/publicclassLvjcSemaphore{/**
* cas 操作需要 Unsafe,当然也可以用 JDK 封装好的 AtomicInteger
*/privatestaticfinallong permitOffset;privatestaticfinal Unsafe unsafe;static{try{
Field field= Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe=(Unsafe) field.get(null);
permitOffset= unsafe.objectFieldOffset(LvjcSemaphore.class.getDeclaredField("permits"));}catch(NoSuchFieldException| IllegalAccessException e){thrownewError(e);}}/**
* volatile 保证并发可见性
*/privatevolatileint permits;/**
* 一个并发非阻塞容器来保存处于等待中的 Thread
*/private ConcurrentLinkedQueue<Thread> queue;publicLvjcSemaphore(int permits){this.permits= permits;this.queue=newConcurrentLinkedQueue<>();}publicvoidacquire()throws InterruptedException{//自旋for(;;){int available= permits;// permit 数量不足if(available<1){// 当前线程进入等待队列
queue.offer(Thread.currentThread());// 阻塞当前线程,// 不必担心其它线程已经先从队列取出当前 thread 进行了 unpark,导致下面的 park 无限阻塞,// 如果 unpark 发生在 park 之前,被 unpark 的线程(如果已经 start)下一次 park 不会阻塞// 但还是要在 park 之前再检查一次,以免在当前线程放入队列之前,其它线程已经 release,错过唤醒导致永远阻塞if(permits<1){
LockSupport.park();}// 醒来之后if(Thread.interrupted()){// 如果是被 interrupt 叫醒的,抛出 InterruptedExceptionthrownewInterruptedException();}}// cas 修改 permit 变量 -1elseif(casPermit(available, available-1)){// 修改成功,退出break;}}}publicvoidacquireUninterruptibly(){for(;;){int available= permits;if(available<1){
queue.offer(Thread.currentThread());if(permits<1){
LockSupport.park();}if(Thread.interrupted()){//相比 acquire 方法,只在这里有区别,这里不抛异常,仅设置 interrupt 状态
Thread.currentThread().interrupt();}}elseif(casPermit(available, available-1)){break;}}}publicbooleantryAcquire(){for(;;){int available= permits;if(available<1){returnfalse;}elseif(casPermit(available, available-1)){// cas 失败要重新进入循环,不能直接 return falsereturntrue;}}}publicvoidrelease(){// 自旋for(;;){int available= permits;// cas 修改 permit 数量 +1if(casPermit(available, available+1)){// 修改成功,因为只放入 1 个 permit,所以只叫醒 1 个线程
Thread waitingThread= queue.poll();if(waitingThread!= null){// 如果有线程正在等待,将其唤醒
LockSupport.unpark(waitingThread);}break;}}}privatebooleancasPermit(int expected,int permit){return unsafe.compareAndSwapInt(this, permitOffset, expected, permit);}}
以上,已经实现了 Semaphore 的基本功能,性能应该也还好。不过这只是一个 unfair 的 Semaphore,fair 模式大同小异。
这个实现与 JDK 没有太大区别,JDK 的 Semaphore 用的是 AQS,本质上也是 volatile + cas + park / unpark。不同的是 AQS 用的队列是 AQS 内部自己实现的,并且在何时入队和出队上面做了比较多的优化。
7.2 Exchanger
Exchanger 用于两个线程之间的数据交换。
7.2.1 核心方法
Exchanger 类的核心方法就一个:
- exchange():提交自己的数据,阻塞,直到另一个线程提交数据,然后醒来,交换数据;或者对方线程先提交了数据,正在阻塞,自己线程提交数据,得到对方数据,对方线程被唤醒拿到自己的数据。
7.2.2 demo
package per.lvjc.concurrent.util;import java.util.concurrent.Exchanger;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;publicclassExchangerDemo{privatestatic Exchanger<String> exchanger=newExchanger<>();publicstaticvoidmain(String[] args)throws InterruptedException{newThread(()->{
String threadName= Thread.currentThread().getName();
System.out.println(threadName+": 发起交易,等待对方同意");try{// 这里仅仅用来阻塞,无所谓对方给什么数据,等待对方调用 exchange 方法
exchanger.exchange("");}catch(InterruptedException e){
System.out.println(threadName+": 交易失败");return;}// 对方也调用 exchange 方法提交数据之后,当前线程就会被唤醒走到这里
System.out.println(threadName+": 对方已同意,交易开始");
String goods= null;try{//自己提交了 “屠龙宝刀 × 1”, goods 为对方提交的数据
goods= exchanger.exchange("屠龙宝刀 * 1",30, TimeUnit.SECONDS);}catch(InterruptedException e){// 打断走这里
System.out.println(threadName+": 交易失败");}catch(TimeoutException e){// 超时走这里
System.out.println(threadName+": 交易超时,已取消");}
System.out.println(threadName+": 交易成功,获得 "+ goods);},"伊莉雅").start();
Thread.sleep(200);newThread(()->{
String threadName= Thread.currentThread().getName();
System.out.println(threadName+": 收到交易申请");try{// exchange 方法可以提交 null,对方也会得到 null
exchanger.exchange(null);}catch(InterruptedException e){
System.out.println(threadName+": 交易失败");return;}
System.out.println(threadName+": 已接受申请,交易开始");
String goods= null;try{
goods= exchanger.exchange("2000000 金币",30, TimeUnit.SECONDS);}catch(InterruptedException e){
System.out.println(threadName+": 交易失败");}catch(TimeoutException e){
System.out.println(threadName+": 交易超时,已取消");}
System.out.println(threadName+": 交易成功,获得 "+ goods);},"士郎").start();}}
输出:
伊莉雅: 发起交易,等待对方同意
士郎: 收到交易申请
伊莉雅: 对方已同意,交易开始
士郎: 已接受申请,交易开始
士郎: 交易成功,获得 屠龙宝刀 * 1
伊莉雅: 交易成功,获得 2000000 金币
7.2.3 注意点
- exchange 必须且仅仅发生在两个线程之间;
- Exchanger 自己保护好,别被第三者横插一脚,数据就被第三者劫走了。
7.2.4 手写
实现思路:
- 定义一个 data 变量存储 exchange 的值;
- 当一个线程调用 exchange 方法时,如果它是先到的,就把自己的值存到 data 变量,然后阻塞自己等待后到的线程唤醒;
- 如果一个线程调用 exchange 方法是,发现自己是后到的,就从 data 变量取出先到的线程放入的值,再把自己的值放入 data 变量,然后唤醒先到的线程,让它从 data 变量取出自己的值;
- 难点在于,当存在多于两个线程时,要处理竞争,可能 AB 交换,可能 BC 交换,也可能 AC 交换,看谁竞争胜出。
package per.lvjc.concurrent.util;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;publicclassLvjcExchanger<V>{private Lock lock=newReentrantLock();private Condition condition= lock.newCondition();private Object data;private Thread thread;public Vexchange(V value)throws InterruptedException{try{
lock.lockInterruptibly();
Thread currentThread= Thread.currentThread();// 其它线程还没有提交数据while(thread== null|| thread== currentThread){
data= value;
thread= currentThread;
condition.await();if(thread!= currentThread){
Object o= data;
data= null;
thread= null;return(V) o;}}
Object o= data;
data= value;
thread= currentThread;
condition.signal();return(V) o;}finally{
lock.unlock();}}public Vexchange(V value,long i, TimeUnit unit)throws InterruptedException, TimeoutException{long remaining= TimeUnit.MILLISECONDS.convert(i, unit);long deadline= System.currentTimeMillis()+ remaining;try{
lock.lockInterruptibly();
Thread currentThread= Thread.currentThread();while(thread== null|| thread== currentThread){
data= value;
thread= currentThread;
condition.await(remaining, TimeUnit.MILLISECONDS);if(thread!= currentThread){
Object o= data;
data= null;
thread= null;return(V) o;}else{
remaining= deadline- System.currentTimeMillis();if(remaining<=0){thrownewTimeoutException();}}}
Object o= data;
data= value;
thread= currentThread;
condition.signal();return(V) o;}finally{
lock.unlock();}}}
这里直接用锁实现了,因为 Exchanger 对多线程竞争的处理要比 Semaphore 复杂得多,用 cas 需要考虑各种并发场景。JDK 是用 cas 不加锁实现的,相应的,性能也要优秀得多。
7.3 CountDownLatch
主要用于两组线程之间的交互:
- 一组是开关线程,当条件满足时,打开开关;
- 一组是工作线程,当所有开关都打开后,开始执行相关逻辑。
7.3.1 核心方法
- CountDownLatch(int):构造方法,设定一个初始 count;
- await():使当前线程阻塞,直到 count == 0;
- countDown():使 count - 1;
通常:
- 预先设定好开关个数,即 count 值;
- 工作线程使用 await 阻塞,等待所有开关都被打开,即 count == 0;
- 开关线程判断工作条件是否满足,当条件满足时,打开开关,即 count - 1。