线程协作工具类原理(2):AQS

2022-09-18 11:19:43

1 学习AQS的思路

  • 学习AQS的目的主要是想理解原理、提高技术,以及应对面试
  • 先从应用层面理解为什么需要他如何使用它,然后再看一看我们Java代码的设计者是如何使用它的了解它的应用场景
  • 这样之后我们再去分析它的结构,这样的话我们就学习得更加轻松了

2为什么需要AQS?

锁和协作类有共同点:闸门

  • 我们已经学过了ReentrantLock和Semaphore,有没有发现它们有共同点?很相似?
  • 事实上,不仅是ReentrantLock和Semaphore,包括CountDownLatch、ReentrantReadWriteLock都有这样类似的 “协作”(或者叫“同步”)功能,其实,它们底层都用了一个共同的基类,这就是AQS
  • 因为上面的那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的 “业务逻辑” 就可以了
  • Semaphore内部有一个Sync类,Sync类继承了AQS
  • CountDownLatch / ReentrantLock也是一样的

// AQS AbstractQueuedSynchronizer 
public class Semaphore{
    abstract static class Sync extends AbstractQueuedSynchronizer {
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
public class ReentrantLock implements Lock {
    abstract static class Sync extends AbstractQueuedSynchronizer {

AQS的比喻


  • 比喻:群面、单面
  • 安排就坐、叫号、先来后到等HR的工作就是AQS做的工作
  • 面试官不会去关心两个面试者是不是号码相同冲突了,也不想去管面试者需要一个地方坐着休息,这些都交给HR去做

  • Semaphore:一个人面试完了以后,后一个人才能进来继续面试
  • CountDownLatch:群面,等待10人到齐
  • Semaphore、CountDownLatch这些同步工具类,要做的就只是写下自己的 “要人” 规则。比如是 “出一个,进一个” 或者说 “凑齐10人,一起面试”
  • 剩下的招呼面试者的脏活累活交给AQS来做

如果没有AQS


3 AQS的作用

  • AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便的被写出来。
  • 一句话总结:有了AQS,构建线程协作类就容易多了。

4 AQS的重要性、地位

AbstractQueuedSynchronizer是Doug Lea写的,从JDK1.5加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架。


5 AQS内部原理解析

AQS最核心的就是三大部分

  1. state
  2. 控制线程抢锁和配合的FIFO队列
  3. 期望协作工具类去实现的获取/释放等重要方法

5.1 state状态

  • 这里的state的具体含义,会根据具体实现类的不同而不同。比如在Semaphore里,它表示 “剩余的许可证的数量” 。而在CountDownLatch里,它表示 “还需要倒数的数量”。在ReentrantLock中,state用来表示 “锁” 的占有情况,包括可重入计数(0重入未占有,大于1表示重入次数),当state的值为0的时候,标识该Lock不被任何线程所占有
  • state是volatile修饰的,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。这些方法都依赖于java.util.concurrent包的支持。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#state
private volatile int state;

// CAS java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
	// See below for intrinsics setup to support this
	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

5.2 控制线程抢锁和配合的FIFO队列

  • 这个队列用来存放 “等待的线程”,AQS就是 “排队管理器” 当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁
  • AQS会维护一个等待的线程队列,把线程都放到这个队列里,这是一个双向形式的队列

5.3期望协作工具类去实现获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同


获取方法

  • 获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候就阻塞)
  • ReentrantLock种,state变量不为0,说明这个锁被其他线程持有,获取锁的线程就会进入阻塞状态
  • Semaphore中,获取就是acquire方法,作用是获取一个许可证
  • 而在CountDownLatch里面,获取就是await方法,作用是 “等待,直到倒数结束”(state不为零就会陷入阻塞,直到其他线程把state减为0,它们就会被唤醒)

释放方法

  • 释放操作不会阻塞
  • 在Semaphore中,释放就是release方法,作用是释放一个许可证
  • CountDownLatch里面,获取就是countDown方法,作用是 “倒数1个数

5.4 需要重写tryAcquire和tryRelease等方法

//java.util.concurrent.CountDownLatch.Sync
private static final class Sync extends AbstractQueuedSynchronizer {
	protected int tryAcquireShared(int acquires) {
		//...
	}
	protected boolean tryReleaseShared(int releases) {
		//...
	}
}

6应用实例、源码解析

6.1 AQS用法

  • 第一步:写一个类,想好协作的逻辑,实现获取/释放方法
  • 第二步:内部写一个Sync类继承AbstractQueuedSynchronizer
  • 第三步:根据是否独占来重写 tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int releases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法

6.2 AQS在CountDownLatch的应用

https://blog.csdn.net/qq_40794973/article/details/104111726#t1

构造函数

//java.util.concurrent.CountDownLatch#CountDownLatch
public CountDownLatch(int count) {
	if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}
//java.util.concurrent.CountDownLatch.Sync#Sync
Sync(int count) {
	setState(count);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#setState
protected final void setState(int newState) {
	state = newState;
}

getCount

//java.util.concurrent.CountDownLatch#getCount
public long getCount() {
	return sync.getCount();
}
//java.util.concurrent.CountDownLatch.Sync#getCount
int getCount() {
	return getState();
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#getState
protected final int getState() {
	return state;
}

countDown

  • 执行-1操作
  • 当减到0的时候唤醒所有等待的线程
//java.util.concurrent.CountDownLatch#countDown
public void countDown() {
	sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		//唤醒之前陷入等待的线程
		doReleaseShared();
		return true;
	}
	return false;
}
//java.util.concurrent.CountDownLatch.Sync#tryReleaseShared
protected boolean tryReleaseShared(int releases) {
	// Decrement count; signal when transition to zero
	//自旋
	for (;;) {
		int c = getState();
		//已经释放过
		if (c == 0)
			return false;
		//
		int nextc = c-1;
		//CAS更新
		if (compareAndSetState(c, nextc))
			//返回true:唤醒等待线程
			return nextc == 0;
	}
}

await

  • 判断当前的线程是否需要等待
  • state > 0:还没有倒数结束,让当前线程进入阻塞队列中
  • state = 0:倒数结束,直接放行
//java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	//判断中断
	if (Thread.interrupted()){
		throw new InterruptedException();
	}
	if (tryAcquireShared(arg) < 0){
		//当前线程进入等待队列
		doAcquireSharedInterruptibly(arg);
	}
}
//java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
	//state==0:等待队列里面的线程都可以放行了
	//state>0:倒数还未结束,线程需要放入等待队列等待
	return (getState() == 0) ? 1 : -1;
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
//把当前线程放入阻塞队列,并且把线程陷入阻塞状态
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
	//把当前线程包装成一个Node节点(参考上面的图片)
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				//
				parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
//挂起当前线程,进入阻塞状态
private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}
//java.util.concurrent.locks.LockSupport#park(java.lang.Object)
public static void park(Object blocker) {
	Thread t = Thread.currentThread();
	setBlocker(t, blocker);
	UNSAFE.park(false, 0L);
	setBlocker(t, null);
}
//sun.misc.Unsafe#park
public native void park(boolean var1, long var2);

AQS在CountDownLatch的总结

  • 调用CountDownLatch的await方法时,便会尝试获取 “共享锁” ,不过一开始是获取不到该锁的,于是线程被阻塞
  • 而 “共享锁” 可获取到的条件,就是 “锁计数器” 的值为0
  • 而 “锁计数器” 的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将 “锁计数器”-1
  • count个线程调用countDown()之后,“锁计数器” 才为0,而前面提到的等待获取共享锁的线程才能继续运行

6.3 AQS在Semaphore的应用

  • 在Semaphore中,state表示许可证的剩余数量
  • 看tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,代表成功。这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加compareAndSetState来改变state状态,直到改变成功就返回正数。或者是期间如果被其他人修改了导致剩余数量不够了,那也返回负数代表获取失败
//java.util.concurrent.Semaphore#acquire(int)
public void acquire(int permits) throws InterruptedException {
	if (permits < 0) throw new IllegalArgumentException();
	sync.acquireSharedInterruptibly(permits);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
	if (Thread.interrupted()){
		throw new InterruptedException();
	}
	if (tryAcquireShared(arg) < 0){
		//当前线程进入排队等待的状态
		doAcquireSharedInterruptibly(arg);
	}
}
//java.util.concurrent.Semaphore.NonfairSync#tryAcquireShared        
protected int tryAcquireShared(int acquires) {
	return nonfairTryAcquireShared(acquires);
}
//java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared
//负数:获取失败
//正数:获取成功
final int nonfairTryAcquireShared(int acquires) {
	for (;;) {
		//当前剩余的许可证的数量
		int available = getState();
		//计算够不够
		int remaining = available - acquires;
		if (remaining < 0 ||
			//CAS修改剩余的许可证数量
			compareAndSetState(available, remaining))
			return remaining;
	}
}

6.4 AQS在ReentrantLock的应用

分析释放锁的方法tryRelease

  • 由于是可重入的,所以state代表重入的次数,每次释放锁,先判断是不是当前持有锁的线程释放的,如果不是就抛异常,如果是的话,重入次数就减一,如果减到了0,就说明完全释放了,于是free就是true,并且把state设置为0。

加锁的方法

  • 判断当前state是否等于0当前线程是否是持有锁的线程。都不是就拿不到这把锁,然后把当前线程放入队列等待以后在合适的时候唤醒

分析释放锁的方法tryRelease

//java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {
	sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
	if (tryRelease(arg)) {
		//已经释放了锁(唤醒等待的线程)
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h); // 唤醒其他线程
		return true;
	}
	return false;
}
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
	//重入次数-1
	int c = getState() - releases;
	//判断当前线程是否是持有锁的线程
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		//设置这把锁自由
		free = true;
		//设置当前持有这把锁的线程为null
		setExclusiveOwnerThread(null);
	}
	//更新
	setState(c);
	return free;
}
//java.util.concurrent.locks.ReentrantLock#lock
public void lock() {
	sync.lock();
}
//java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() {
	if (compareAndSetState(0, 1)){//CAS:当前没有任何线程持有这把锁的时候才能够设置成功
		setExclusiveOwnerThread(Thread.currentThread());//当前线程设置为持有锁的线程
	}else{
		acquire(1);
	}	
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
	//获取锁失败(!tryAcquire(arg) == true)当前线程会被放入等待队列中(addWaiter(Node.EXCLUSIVE)),并且去排队直到时机合适再来重新获取锁
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
		selfInterrupt();
	}	
}
//java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}
//java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {//当前这个锁是否被持有
		//没有任何线程持有这把锁
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}else if (current == getExclusiveOwnerThread()) {//这把锁的持有者是当前线程(重入)
		//重入次数+1(acquires通常是1)
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		//重入次数+1	
		setState(nextc);
		return true;
	}
	return false;
}

7 利用AQS实现一个自己的Latch门闩

  1. 写一个类,想好协作的逻辑,实现获取/释放方法
  2. 内部写一个Sync类继承AbstractQueuedSynchronizer
  3. 根据是否独占来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int releases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法
/**
 * 自己用AQS实现一个简单的线程协作器(类似于CountDownLatch传入1)
 */
public class OneShotLatch {
    private final Sync sync = new Sync();
    //释放
    public void signal() {
        sync.releaseShared(0); // 点进去
    }
    //获取,谁调用谁等待
    public void await() {
        sync.acquireShared(0); // 点进去
    }
    /**
     * state
     * 0:关闭
     * 1:打开
     */
    private class Sync extends AbstractQueuedSynchronizer {
        // java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShared
        // public final void acquireShared(int arg) {
        // 	if (tryAcquireShared(arg) < 0)
        // 		doAcquireShared(arg); // 放入柱塞队列
        // }
        @Override
        protected int tryAcquireShared(int arg) {
            return (super.getState() == 1) ? 1 : -1;
        }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
        // public final boolean releaseShared(int arg) {
        // 	 if (tryReleaseShared(arg)) {
        // 		 doReleaseShared(); // 唤醒所有线程
        // 		 return true;
        // 	 }
        // 	 return false;
        // }
        @Override
        protected boolean tryReleaseShared(int arg) {
           super.setState(1);
           return true;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败陷入等待");
                oneShotLatch.await();
                System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
            }).start();
        }
        Thread.sleep(5000);
        // 开闸
        oneShotLatch.signal();
        // 一次性,一但唤醒了,后面的线程就不会再次陷入柱塞了
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败陷入等待");
            oneShotLatch.await();
            System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
        }).start();
    }
}
  • 作者:爪 哇
  • 原文链接:https://yuanyu.blog.csdn.net/article/details/104113638
    更新时间:2022-09-18 11:19:43