Java并发系列并发工具类

2022-10-07 12:26:19

7 并发工具类

除了之前讲到的并发容器,JDK 还提供了一些方便易用的并发工具类。

7.1 Semaphore

Semaphore 主要用于控制并发量。

7.1.1 核心方法

Semaphore 的核心方法就两个:

  • acquire():取走一个 permit,成功拿到了 permit 才能继续干活,否则就阻塞直到拿到为止;
  • release():放入一个 permit,通常干完活之后把之前 acquire 的 permit 再放回去。

7.1.2 demo

package per.lvjc.concurrent.util;import java.util.concurrent.Semaphore;publicclassSemaphoreDemo{//小区门口一共 2 辆共享单车privatestatic Semaphore bicycles=newSemaphore(2);publicstaticvoidmain(String[] args)throws InterruptedException{
        Runnable runnable=()->{
            String name= Thread.currentThread().getName();
            System.out.println(name+": 社畜的一天开始了");timeFlows();boolean findBicycle= bicycles.tryAcquire();if(findBicycle){
                System.out.println(name+": 竟然还有一辆共享单车,骑车上班");timeFlows();
                System.out.println(name+": 下班还车");
                bicycles.release();}else{
                System.out.println(name+": 没车了,走路上班吧");}
            System.out.println(name+": 社畜的一天结束了");};for(int i=1; i<=6; i++){
            System.out.println("------ 星期 "+ i+" ------");newThread(runnable,"Jack").start();newThread(runnable,"Rose").start();newThread(runnable,"Nobody").start();
            Thread.sleep(500);}}privatestaticvoidtimeFlows(){try{
            Thread.sleep(100);}catch(InterruptedException e){
            e.printStackTrace();}}}

输出结果:

------ 星期 1 ------
Jack: 社畜的一天开始了
Rose: 社畜的一天开始了
Nobody: 社畜的一天开始了
Jack: 竟然还有一辆共享单车,骑车上班
Rose: 竟然还有一辆共享单车,骑车上班
Nobody: 没车了,走路上班吧
Nobody: 社畜的一天结束了
Jack: 下班还车
Rose: 下班还车
Jack: 社畜的一天结束了
Rose: 社畜的一天结束了
------ 星期 2 ------
Jack: 社畜的一天开始了
Rose: 社畜的一天开始了
Nobody: 社畜的一天开始了
Jack: 竟然还有一辆共享单车,骑车上班
Rose: 竟然还有一辆共享单车,骑车上班
Nobody: 没车了,走路上班吧
Nobody: 社畜的一天结束了
Jack: 下班还车
Jack: 社畜的一天结束了
Rose: 下班还车
Rose: 社畜的一天结束了
------ 星期 3 ------
(省略......)

这里,通过 tryAcquire() 方法尝试骑走一辆共享单车,不阻塞,有就返回 true,否则返回 false;

因为 Semaphore 设置了只有 2,所以只有 2 个人可以骑车,剩下一个人得走路;

因为骑走车的人又还回来了,所以第二天又有 2 辆车可以骑。

7.1.3 注意点

  • 构造方法设的参数仅仅是初始值;
  • Semaphore 跟 Lock 不一样,Lock 在释放锁之前必须获得锁,Semaphore 可以不 acquire 直接 release;
  • fair 与 unfair 这点与 Lock 是一样的;
  • Semaphore 有个 acquireUninterruptibly 方法,acquire 时如果获取不到则阻塞并且不可被打断,但别真的没事就去 interrupt 一下,线程会从阻塞中被唤醒,然后发现还是 acquire 不到,又重新阻塞,损伤效率的。

7.1.4 手写

相比前面已经讲过的 AQS 和 ConcurrentHashMap 来说,Semaphore 算是非常简单的了,所以我们来自己实现一个 Semaphore。

先明确核心功能:

  • acquire:
    • 获取一个(或多个)permit——需要一个变量来存储当前 permit 数量,并且要保证并发可见性;
    • 取走 permit 之后,修改剩余 permit 数量——并发修改要保证原子性;
    • 如果 permit 数量不足,当前线程要阻塞——等待和通知;
  • release:
    • 放入一个(或多个)permit;
    • 修改 permit 数量;
    • 如果有线程在等待,将其唤醒。

实现方案:

  • 保证可见性:
    • synchronized;
    • volatile;
  • 保证原子性:
    • synchronized;
    • cas;
  • 线程的等待和通知:
    • wait / notify;
    • await / signal;
    • park / unpark(不同于上面两个,不会释放锁)。

所以实现方案有多种,这里选用 volatile + cas + park / unpark:

package per.lvjc.concurrent.util;import sun.misc.Unsafe;import java.lang.reflect.Field;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.locks.LockSupport;/**
 * @author lvjc
 * @date 2020/9/7
 */publicclassLvjcSemaphore{/**
     * cas 操作需要 Unsafe,当然也可以用 JDK 封装好的 AtomicInteger
     */privatestaticfinallong permitOffset;privatestaticfinal Unsafe unsafe;static{try{
            Field field= Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe=(Unsafe) field.get(null);
            permitOffset= unsafe.objectFieldOffset(LvjcSemaphore.class.getDeclaredField("permits"));}catch(NoSuchFieldException| IllegalAccessException e){thrownewError(e);}}/**
     * volatile 保证并发可见性
     */privatevolatileint permits;/**
     * 一个并发非阻塞容器来保存处于等待中的 Thread
     */private ConcurrentLinkedQueue<Thread> queue;publicLvjcSemaphore(int permits){this.permits= permits;this.queue=newConcurrentLinkedQueue<>();}publicvoidacquire()throws InterruptedException{//自旋for(;;){int available= permits;// permit 数量不足if(available<1){// 当前线程进入等待队列
                queue.offer(Thread.currentThread());// 阻塞当前线程,// 不必担心其它线程已经先从队列取出当前 thread 进行了 unpark,导致下面的 park 无限阻塞,// 如果 unpark 发生在 park 之前,被 unpark 的线程(如果已经 start)下一次 park 不会阻塞// 但还是要在 park 之前再检查一次,以免在当前线程放入队列之前,其它线程已经 release,错过唤醒导致永远阻塞if(permits<1){
                    LockSupport.park();}// 醒来之后if(Thread.interrupted()){// 如果是被 interrupt 叫醒的,抛出 InterruptedExceptionthrownewInterruptedException();}}// cas 修改 permit 变量 -1elseif(casPermit(available, available-1)){// 修改成功,退出break;}}}publicvoidacquireUninterruptibly(){for(;;){int available= permits;if(available<1){
                queue.offer(Thread.currentThread());if(permits<1){
                    LockSupport.park();}if(Thread.interrupted()){//相比 acquire 方法,只在这里有区别,这里不抛异常,仅设置 interrupt 状态
                    Thread.currentThread().interrupt();}}elseif(casPermit(available, available-1)){break;}}}publicbooleantryAcquire(){for(;;){int available= permits;if(available<1){returnfalse;}elseif(casPermit(available, available-1)){// cas 失败要重新进入循环,不能直接 return falsereturntrue;}}}publicvoidrelease(){// 自旋for(;;){int available= permits;// cas 修改 permit 数量 +1if(casPermit(available, available+1)){// 修改成功,因为只放入 1 个 permit,所以只叫醒 1 个线程
                Thread waitingThread= queue.poll();if(waitingThread!= null){// 如果有线程正在等待,将其唤醒
                    LockSupport.unpark(waitingThread);}break;}}}privatebooleancasPermit(int expected,int permit){return unsafe.compareAndSwapInt(this, permitOffset, expected, permit);}}

以上,已经实现了 Semaphore 的基本功能,性能应该也还好。不过这只是一个 unfair 的 Semaphore,fair 模式大同小异。

这个实现与 JDK 没有太大区别,JDK 的 Semaphore 用的是 AQS,本质上也是 volatile + cas + park / unpark。不同的是 AQS 用的队列是 AQS 内部自己实现的,并且在何时入队和出队上面做了比较多的优化。

7.2 Exchanger

Exchanger 用于两个线程之间的数据交换。

7.2.1 核心方法

Exchanger 类的核心方法就一个:

  • exchange():提交自己的数据,阻塞,直到另一个线程提交数据,然后醒来,交换数据;或者对方线程先提交了数据,正在阻塞,自己线程提交数据,得到对方数据,对方线程被唤醒拿到自己的数据。

7.2.2 demo

package per.lvjc.concurrent.util;import java.util.concurrent.Exchanger;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;publicclassExchangerDemo{privatestatic Exchanger<String> exchanger=newExchanger<>();publicstaticvoidmain(String[] args)throws InterruptedException{newThread(()->{
            String threadName= Thread.currentThread().getName();
            System.out.println(threadName+": 发起交易,等待对方同意");try{// 这里仅仅用来阻塞,无所谓对方给什么数据,等待对方调用 exchange 方法
                exchanger.exchange("");}catch(InterruptedException e){
                System.out.println(threadName+": 交易失败");return;}// 对方也调用 exchange 方法提交数据之后,当前线程就会被唤醒走到这里
            System.out.println(threadName+": 对方已同意,交易开始");
            String goods= null;try{//自己提交了 “屠龙宝刀 × 1”, goods 为对方提交的数据
                goods= exchanger.exchange("屠龙宝刀 * 1",30, TimeUnit.SECONDS);}catch(InterruptedException e){// 打断走这里
                System.out.println(threadName+": 交易失败");}catch(TimeoutException e){// 超时走这里
                System.out.println(threadName+": 交易超时,已取消");}
            System.out.println(threadName+": 交易成功,获得 "+ goods);},"伊莉雅").start();
        Thread.sleep(200);newThread(()->{
            String threadName= Thread.currentThread().getName();
            System.out.println(threadName+": 收到交易申请");try{// exchange 方法可以提交 null,对方也会得到 null
                exchanger.exchange(null);}catch(InterruptedException e){
                System.out.println(threadName+": 交易失败");return;}
            System.out.println(threadName+": 已接受申请,交易开始");
            String goods= null;try{
                goods= exchanger.exchange("2000000 金币",30, TimeUnit.SECONDS);}catch(InterruptedException e){
                System.out.println(threadName+": 交易失败");}catch(TimeoutException e){
                System.out.println(threadName+": 交易超时,已取消");}
            System.out.println(threadName+": 交易成功,获得 "+ goods);},"士郎").start();}}

输出:

伊莉雅: 发起交易,等待对方同意
士郎: 收到交易申请
伊莉雅: 对方已同意,交易开始
士郎: 已接受申请,交易开始
士郎: 交易成功,获得 屠龙宝刀 * 1
伊莉雅: 交易成功,获得 2000000 金币

7.2.3 注意点

  • exchange 必须且仅仅发生在两个线程之间;
  • Exchanger 自己保护好,别被第三者横插一脚,数据就被第三者劫走了。

7.2.4 手写

实现思路:

  • 定义一个 data 变量存储 exchange 的值;
  • 当一个线程调用 exchange 方法时,如果它是先到的,就把自己的值存到 data 变量,然后阻塞自己等待后到的线程唤醒;
  • 如果一个线程调用 exchange 方法是,发现自己是后到的,就从 data 变量取出先到的线程放入的值,再把自己的值放入 data 变量,然后唤醒先到的线程,让它从 data 变量取出自己的值;
  • 难点在于,当存在多于两个线程时,要处理竞争,可能 AB 交换,可能 BC 交换,也可能 AC 交换,看谁竞争胜出。
package per.lvjc.concurrent.util;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;publicclassLvjcExchanger<V>{private Lock lock=newReentrantLock();private Condition condition= lock.newCondition();private Object data;private Thread thread;public Vexchange(V value)throws InterruptedException{try{
            lock.lockInterruptibly();
            Thread currentThread= Thread.currentThread();// 其它线程还没有提交数据while(thread== null|| thread== currentThread){
                data= value;
                thread= currentThread;
                condition.await();if(thread!= currentThread){
                    Object o= data;
                    data= null;
                    thread= null;return(V) o;}}
            Object o= data;
            data= value;
            thread= currentThread;
            condition.signal();return(V) o;}finally{
            lock.unlock();}}public Vexchange(V value,long i, TimeUnit unit)throws InterruptedException, TimeoutException{long remaining= TimeUnit.MILLISECONDS.convert(i, unit);long deadline= System.currentTimeMillis()+ remaining;try{
            lock.lockInterruptibly();
            Thread currentThread= Thread.currentThread();while(thread== null|| thread== currentThread){
                data= value;
                thread= currentThread;
                condition.await(remaining, TimeUnit.MILLISECONDS);if(thread!= currentThread){
                    Object o= data;
                    data= null;
                    thread= null;return(V) o;}else{
                    remaining= deadline- System.currentTimeMillis();if(remaining<=0){thrownewTimeoutException();}}}
            Object o= data;
            data= value;
            thread= currentThread;
            condition.signal();return(V) o;}finally{
            lock.unlock();}}}

这里直接用锁实现了,因为 Exchanger 对多线程竞争的处理要比 Semaphore 复杂得多,用 cas 需要考虑各种并发场景。JDK 是用 cas 不加锁实现的,相应的,性能也要优秀得多。

7.3 CountDownLatch

主要用于两组线程之间的交互:

  • 一组是开关线程,当条件满足时,打开开关;
  • 一组是工作线程,当所有开关都打开后,开始执行相关逻辑。

7.3.1 核心方法

  • CountDownLatch(int):构造方法,设定一个初始 count;
  • await():使当前线程阻塞,直到 count == 0;
  • countDown():使 count - 1;

通常:

  • 预先设定好开关个数,即 count 值;
  • 工作线程使用 await 阻塞,等待所有开关都被打开,即 count == 0;
  • 开关线程判断工作条件是否满足,当条件满足时,打开开关,即 count - 1。

<

  • 作者:JinchaoLv
  • 原文链接:https://blog.csdn.net/weixin_38380858/article/details/108591787
    更新时间:2022-10-07 12:26:19