深入理解信号量Semaphore

2023年1月14日12:28:22

深入理解Semaphore

Semaphore用法详解

介绍

Semaphore也就是信号量,提供了资源数量的并发访问控制。
Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于 AbstractQueuedSynchronizer实现的。

深入理解信号量Semaphore

适用场景

  • 可以用来加锁
    Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获 取信号量实现。

  • 可以用来控制并发获取资源
    大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同 时获取信号量。

Semaphore 常用方法

permits 表示许可证的数量(资源数)
fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最 久的线程

深入理解信号量Semaphore

常用API

深入理解信号量Semaphore

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻 塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

同一时间允许最大的线程通过的许可证,当做锁的话,同一时间只允许一个线程进来。
此时许可证就是1

    static  class SemaphoreLock {
        //一个许可证
        Semaphore semaphore = new Semaphore(1); //permits 许可证  数字是许可证   同一时间允许最大的线程通过的许可证

        // 1 的话相当于 synchronized 表示 一个锁,同一个时间内只有一个线程能进来
        //但同一时间 要求多个线程进来 synchronized是做不到的,Semaphore是可以做到的
        void lock() throws InterruptedException {
            semaphore.acquire();  // 获取
        }

        void unlock() {
            semaphore.release();//释放
        }
    }

一个线程进去之后,离开之后 另一个线程才能进去,也就是 semaphoreLock.lock(); 的时候 只有一个结束后 另一个才能执行

  SemaphoreLock semaphoreLock = new SemaphoreLock();
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {

                try {
                    System.out.println(Thread.currentThread().getName() + " in ");
                    semaphoreLock.lock();
                    System.out.println(Thread.currentThread().getName() + " out ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphoreLock.unlock(); // 不可用
                }
                System.out.println(Thread.currentThread().getName() + " finshed.... ");
            }).start();
        }

可以看到如下结果
深入理解信号量Semaphore

控制获取资源

正常获取 获取和释放 许可证数量一致

        Semaphore  semaphoreLock = new Semaphore(2);
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " in ");
                    semaphoreLock.acquire(2);  //  获取几个许可证
                    System.out.println(Thread.currentThread().getName() + " out ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphoreLock.release(2);
                }
                System.out.println(Thread.currentThread().getName() + " finshed.... ");
            }).start();
        }

拿了2个许可证 释放了一个 ,许可证是1个,此时队列里线程还有一个

 Semaphore  semaphoreLock1 = new Semaphore(2);
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " in ");
                    semaphoreLock1.acquire(2);  //  拿2个许可证
                    System.out.println(Thread.currentThread().getName() + " get Semaphore ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphoreLock1.release(1);  // 释放一个 第二个没法释放 就一直在等待
                }
                System.out.println(Thread.currentThread().getName() + " finshed.... ");
            }).start();
        }


        while (true){

            // 拿了2个许可证 释放了一个 ,许可证是1个,此时队列里线程还有一个
            System.out.println(" ql block  队列 中线程 数量: " + semaphoreLock1.getQueueLength());
            System.out.println(" ap 多少可用许可证 : " + semaphoreLock1.availablePermits());
            System.out.println(" ----------------  ");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

结果如下
深入理解信号量Semaphore

用了2个许可证,还剩一个 , 当前可用许可证 availablePermits()

      Semaphore  semaphoreLock2 = new Semaphore(2);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " in ");
                    semaphoreLock2.acquire(1);
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + " get Semaphore ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphoreLock2.release(1);
                }
                System.out.println(Thread.currentThread().getName() + " finshed.... ");
            }).start();
        }


        while (true){
            // 监控 block thread 数量 和  当前可用许可证
            System.out.println(" ql block  队列 中线程 数量: " + semaphoreLock2.getQueueLength());
            System.out.println(" ap 多少可用许可证 : " + semaphoreLock2.availablePermits()); //当前可用许可证
            System.out.println(" ----------------  ");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

 

当获取许可证的时候,打断线程


        Semaphore semaphoreLock2 = new Semaphore(1);
        Thread t1 = new Thread(() -> {
            try {
                semaphoreLock2.acquire();
                TimeUnit.SECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphoreLock2.release();
            }
            System.out.println(Thread.currentThread().getName() + " finshed.... ");
        });
        t1.start();


        TimeUnit.SECONDS.sleep(10);

        Thread t2 = new Thread(() -> {
            try {
               semaphoreLock2.acquire();
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphoreLock2.release();
            }
            System.out.println(Thread.currentThread().getName() + " finshed.... ");
        });


        t2.start();

        t2.interrupt();

当线程获取许可证时候,同事睡眠几秒模拟处理事情,我们中断线程。
可以看到抛出异常,此时被中断,但看不出来是sleep被中断还是获取许可证时被中断。

深入理解信号量Semaphore

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

acquire() 和 sleep()都会抛出中断异常。 会影响获取许可证

线程被打断时候 不影响获取许可证 可以用下面这个方法获取许可证

   public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

可以看到 编译器直接报错。这类不会影响get 许可证
深入理解信号量Semaphore

原理

举例

  // 声明3个窗口  state:  资源数
        Semaphore windows = new Semaphore(3);

        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 占用窗口    加锁
                        windows.acquire();
                        System.out.println(Thread.currentThread().getName() + ": 开始买票");
                        //模拟买票流程
                        Thread.sleep(5000);
                        System.out.println(Thread.currentThread().getName() + ": 购票成功");

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 释放窗口
                        windows.release();
                        System.out.println(Thread.currentThread().getName() + ": release");
                    }
                }
            }).start();

        }

首先也是基于 AQS 队列

首先debug 过一遍

信号量是 3 也就是并行同时最大通过 3 个线程,t0 t1 t2都拿到锁了,进入代码块里面。

深入理解信号量Semaphore
深入理解信号量Semaphore
深入理解信号量Semaphore
然后t3 进入 此时是不能获取到锁的

深入理解信号量Semaphore
信号量state是 -1 直接返回
深入理解信号量Semaphore
走外面的逻辑

深入理解信号量Semaphore
构建双向队列,设置队列 nextWaiter 是 Node.SHARED 共享模式

深入理解信号量Semaphore
这里是aqs里 首次cas创建头结点 以及入队操作

深入理解信号量Semaphore
cas入队
深入理解信号量Semaphore
继续尝试获取锁

深入理解信号量Semaphore

获取失败阻塞当前线程

深入理解信号量Semaphore
具体操作和之前 lock 锁aqs的流畅一样

设置双向队列 中当前节点的前置节点是-1,用来唤醒下一个节点

深入理解信号量Semaphore

设置成功后 第二次循环继续进入次方法,此时判断 前置节点waitStatus 是 -1 直接返回(这里为啥要循环,当前是重试自旋了啊,并发下只有一个是成功的,其他失败了要自旋重试 )
深入理解信号量Semaphore

此时外面进行阻塞

深入理解信号量Semaphore
park
深入理解信号量Semaphore

现在切换t4线程

深入理解信号量Semaphore
t4肯定是失败 获取不到锁

深入理解信号量Semaphore

同样会新建node 节点 以及入队

深入理解信号量Semaphore
同样会 使用 shouldParkAfterFailedAcquire() 设置 为 -1 以及使用 parkAndCheckInterrupt() 进行阻塞

此时释放一个信号量 t0

深入理解信号量Semaphore

release操作
深入理解信号量Semaphore

深入理解信号量Semaphore

使用cas进行信号量+1

深入理解信号量Semaphore

唤醒

深入理解信号量Semaphore

设置 为 0 ,然后unpark

深入理解信号量Semaphore

此时t3已经被唤醒了

深入理解信号量Semaphore

如果此时在这儿有一个释放锁,那么此时这里p和head节点是相同的,头节点也就指向了t3线程的节点

深入理解信号量Semaphore
此时释放t0现场

t3被唤醒 此时 头结点和当前 Node 节点都是t3线程这个节点 p==head 获取锁成功

深入理解信号量Semaphore
此时t3 就可以唤醒t4线程了 如果是共享模式 isShared ,这也就是为了之前构建节点的时候 设置模式是 isShared 的原因
深入理解信号量Semaphore

深入理解信号量Semaphore

cas设置 为 0
深入理解信号量Semaphore
成功进入unpark
深入理解信号量Semaphore
唤醒下一个节点

深入理解信号量Semaphore

具体的流程图如下

加锁

深入理解信号量Semaphore

释放锁

深入理解信号量Semaphore

  • 作者:tizzybepeacejoy
  • 原文链接:https://pilgrim.blog.csdn.net/article/details/123808320
    更新时间:2023年1月14日12:28:22 ,共 5388 字。