aqs原理初探以及公平锁和非公平锁实现

2022-12-01 09:28:18

一,AQS

AbstractQueuedSynchronizer,定义了一套多线程访问共享资源的同步器框架,依赖于状态的同步器

1,ReentrantLock

一种基于AQS框架的应用实现,类似于synchronized是一种互斥锁,可以保证线程安全。它支持手动加锁与解锁,支持加锁的公平性。主要是Lock锁的实现

publicclassReentrantLockimplementsLock,Serializable

接下来可以手动的猜想一下这个reentrantLock的实现

ReentrantLock lock=newReentrantLock(true);HashSet hashSet=newHashSet();3个线程 T0,T1,T2
lock.lock();//加锁while(true){//循环,轮询获取锁if(加锁成功){//synchronized,cas  cas:compare and swapbreak;//跳出循环}//Thread.yeild() //让出cpu使用权//Thread.sleep(1000); //睡眠
        hashSet.add(thread);//将当前线程加入到set中//阻塞LockSupprot.park();}
	T0获取锁
    xxxxx业务逻辑
    xxxxx业务逻辑   
lock.unlock();Thread thread= hashSet.get();//唤醒当前线程,notify是唤醒随机的线程LockSupport.unPark(thread);

三大核心:自旋,加锁,LockSupport,队列(LinkQueue),为了解决这个公平锁和非公平锁,因此优先考虑这个队列。

2,CAS

compare and swap,比较与交换
在这里插入图片描述

如在jmm模型中,两个工作内存都去修改主内存的值。主内存中存在一个a = 0,线程A和线程B的工作内存同时获取到这个值,如果线程A先修改这个值,则线程A会和主内存比较,如果线程A的值a和主内存的值一致,那么就会直接进行修改,如改成a = 1,那么线程B也要改这个值,线程B中的a = 0,那么会和主内存a比较,发现不一致,主内存a=1,那么就会优先将线程B中的值修改成a = 1,再对a进行修改。就是说相等直接修改,不相等需要重新读取,再进行修改。即在一个原子操作里面进行比较和替换

主要通过这个unsafe类实现,里面的实现也是原子类操作,主要是通过以下三个类实现。

//对象类型publicfinalnativebooleancompareAndSwapObject(Object var1,long var2,Object var4,Object var5);//整型值publicfinalnativebooleancompareAndSwapInt(Object var1,long var2,int var4,int var5);//Long型值publicfinalnativebooleancompareAndSwapLong(Object var1,long var2,long var4,long var6);

3,AbstractQueuedSynchronizer

该类是ReentrantLock里面的一个抽象内部类。ctrl + alt + shift + u看所有子类,Ctrl + T,看所有的继承类,可以发现很多地方都继承或者实现了这个抽象类

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4dzYpLnW-1657115375965)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1656947493769.png)]
Sync是ReentrantLock的一个抽象的静态内部类,根据图也可以发现这个Sync继承了这个AQS

abstractstaticclassSyncextendsAbstractQueuedSynchronizer

通过实现Sync这个接口得到了FairSync公平锁类和NofairSync非公平锁这个类

3.1,FairSync

实现了公平锁,需要排队获取锁,如存在线程t1,t2,t3依次获取锁,需要依次排队执行,突然来了一个线程t4,也是需要排在线程t3后面

ReentrantLock reentrantLock=newReentrantLock(true);publicReentrantLock(boolean fair){//入参,用于判断是公平锁还是非公平锁
    sync= fair?newFairSync():newNonfairSync();}//公平锁的实现staticfinalclassFairSyncextendsReentrantLock.Sync{...}

公平锁获取锁的方式如下

finalvoidlock(){acquire(1);}publicfinalvoidacquire(int arg){//tryAcquire:尝试去获取锁//addWaiter:线程入队,一个同步等待队列,基于双向列表实现//链表中的每一个结点为一个Node结点,if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//线程入队操作privateNodeaddWaiter(Node mode){//获取当前线程结点Node node=newNode(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred= tail;if(pred!=null){
        node.prev= pred;if(compareAndSetTail(pred, node)){
            pred.next= node;return node;}}//结点入队enq(node);return node;}//判断是否获取锁protectedfinalbooleantryAcquire(int acquires){//获取当前获取锁的线程finalThread current=Thread.currentThread();//获取当前同步器状态,被volatile修饰的整型值,默认为0int c=getState();//如果同步状态器的值为0,说明外面的线程可以进行加锁操作if(c==0){//hasQueuedPredecessors:判断队列中是否还有在排队的节点//compareAndSetState:原子操作,比较与交换,进行加锁的操作,将state变量将0变为1//setExclusiveOwnerThread:设置获取锁的的线程拥有者if(!hasQueuedPredecessors()&&compareAndSetState(0, acquires)){setExclusiveOwnerThread(current);returntrue;}}//如果状态同步器不为0,可能由自己持有,也可能由别的线程持有锁//重复加锁,如定义一个全局锁,出现了这个可重入锁的问题elseif(current==getExclusiveOwnerThread()){//如果自己持有锁的话,state+1即可,反正不等于0就可以int nextc= c+ acquires;if(nextc<0)thrownewError("Maximum lock count exceeded");setState(nextc);returntrue;}//别的线程获有锁,直接返回returnfalse;}

总而言之就是说,公平锁就是就是通过一个队列实现,需要进行排队的去获取锁资源。主要是通过这个state的资源状态器来控制获取锁的拥有者,如果state为0,则表示队列中的下一个线程可以去获取锁,并且通过cas的方式来保证锁的安全并发问题。通过队列的思想,来保证这个获取锁的公平性和有序性。

3.2,NofairSync

实现了非公平锁,默认为非公平锁,会存在抢锁的情况

ReentrantLock reentrantLock=newReentrantLock(flase);//如果不传入参数,默认是非公平锁publicReentrantLock(){
    sync=newNonfairSync();}publicReentrantLock(boolean fair){//入参,用于判断是公平锁还是非公平锁
    sync= fair?newFairSync():newNonfairSync();}//非公平锁的实现staticfinalclassNonfairSyncextendsReentrantLock.Sync{...}

非公平锁获取锁的方法和公平锁类似,只是少了几步使用队列的几个方法

3.3,AQS中几个重要的相关参数

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-12BMqIQA-1657115375967)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1657029597278.png)]

exclusiveOwnerThread:用于记录当前独占模式下,获取锁的线程是谁

state:同步状态器,默认为0,表示当前没有线程获取锁,外面的线程可以来获取锁了

Node:双向链表结构,是一个同步等待队列,head:队头,tail:队尾,prev前躯指针,next,后继指针

waiteState:结点的什生命状态

Lock锁和synchronized锁都是可重入锁

3.4,Node

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Zfr5p0Ru-1657115375970)(C:\Users\HULOUBO\AppData\Roaming\Typora\typora-user-images\1657110491185.png)]
pre:前驱指针
next:后继指针
waitStatues:每个结点都存在很多状态,这个主要是存储结点的生命状态

结点的几个生命状态如下

SIGNAL:-1//可被唤醒
CANCELLED:1//代表异常,中断引起的,需要被废弃
CONDITION:-2//条件等待
PROPAGATE:-3//传播Init:0初始状态

结点入队顺序如下,入队时,将入队结点得前驱指针指向链表的tail结点,将tail节点的next节点指向当前节点,并将当前结点设置为tail尾指针结点。

privateNodeenq(finalNode node){//自旋for(;;){Node t= tail;if(t==null){// Must initialize//如果队列为空,要防止出现多个线程的并发问题,结点直接放在队头if(compareAndSetHead(newNode()))
                tail= head;}else{
            node.prev= t;//保证这个入队的安全性,防止入队出现并发问题//结点入队到队尾if(compareAndSetTail(t, node)){
                t.next= node;return t;}}}}

当前结点前面有结点获取锁,当前节点需要进行阻塞park,线程要开始排队等待。
结点在阻塞之前,还得尝试获取一次锁。
a,如果结点可以获取到锁,即当前节点为头结点的下一个结点,头结点即将锁被释放,则把当前结点作为头结点。之前的头结点就可以被GC回收了
b,如果结点不能获取到锁,那么当前结点就要等待被唤醒
(1),第一轮循环会去修改head状态,并且将waitState修改为sinal = -1可被唤醒状态
(2),第二轮,阻塞线程

finalbooleanacquireQueued(finalNode node,int arg){boolean failed=true;try{boolean interrupted=false;for(;;){//当前节点的前驱指针指向的节点finalNode p= node.predecessor();//如果当前结点指向的该结点为头部结点,则不需要进行阻塞if(p== head&&tryAcquire(arg)){//将当前结点作为头部结点setHead(node);
                p.next=null;// help GC
                failed=false;return interrupted;}if(shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())
                interrupted=true;}}finally{if(failed)cancelAcquire(node);}}

每个结点的生命状态的变换如下。默认的结点状态为0,需要将节点状态(waitState)转化为-1可唤醒状态。前一个节点中的waitStatus状态记录着后一个节点的生命状态。

//pred:前驱节点 node:当前节点privatestaticbooleanshouldParkAfterFailedAcquire(Node pred,Node node){//默认为初始状态0int ws= pred.waitStatus;//SIGNAL为-1,可被唤醒状态if(ws==Node.SIGNAL)returntrue;if(ws>0){do{
            node.prev= pred= pred.prev;}while(pred.waitStatus>0);
        pred.next= node;}else{//将头结点初始状态转化为可唤醒状态compareAndSetWaitStatus(pred, ws,Node.SIGNAL);}returnfalse;}

在修改成可被唤醒的状态之后,就进行阻塞操作。

privatefinalbooleanparkAndCheckInterrupt(){//阻塞线程,并且判断该线程是否是被中断的LockSupport.park(this);returnThread.interrupted();}

在头结点释放锁的时候,也会发一个通知去告知下一个需要获取锁的线程来抢锁,即唤醒队列中的下一个被阻塞的线程

//真正的释放锁publicvoidunlock(){
    sync.release(1);}publicfinalbooleanrelease(int arg){//尝试释放锁if(tryRelease(arg)){Node h= head;//因为前一个结点中存放后一个节点中的声明状态,因此//如果头结点的waitStatus不为0,就说明后一个结点是一个可被唤醒的状态if(h!=null&& h.waitStatus!=0)//唤醒unparkSuccessor(h);returntrue;}returnfalse;}//尝试去释放锁protectedfinalbooleantryRelease(int releases){//修改同步状态器stateint c=getState()- releases;if(Thread.currentThread()!=getExclusiveOwnerThread())thrownewIllegalMonitorStateException();boolean free=false;//将同步状态器state 变为 0,说明当前同步状态器可以进行锁的获取了if(c==0){
        free=true;//置空当前线程setExclusiveOwnerThread(null);}setState(c);return free;}

最后在这个unparkSuccessor方法中,也有这个具体的unpark唤醒操作

if(s!=null)LockSupport.unpark(s.thread)

通过上述代码描述,也验证了一开始的猜想:自旋,加锁,LockSupport,队列(LinkQueue)

  • 作者:huisheng_qaq
  • 原文链接:https://blog.csdn.net/zhenghuishengq/article/details/125648495
    更新时间:2022-12-01 09:28:18