ReentrantReadWriteLock–读写锁
重入锁ReentrantLock是排他锁,排他锁在同一时刻只能有一个线程获得锁,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有时间较少。
- 读写锁在同一时刻可以允许多个下读线程访问,但在写线程访问时,所有的读线程和其他线程都要被阻塞。读写锁维护了一对锁,读锁和写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大的提升。
- Java 5之前,如果想要实现读写分离的操作,就要使用Java的等待通知机制,也就是使用synchronized,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行,这样做的目的是使读操作能读到正确的数据,不会出现脏读。
ReentrantReadWriteLock实现了ReadWriteLock接口。
publicinterfaceReadWriteLock{
LockreadLock();
LockwriteLock();}
ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,它的读锁、写锁都是依靠自定义同步器Sync来实现的。所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一样而已,它的读写锁其实就是两个类:ReadLock、writeLock,这两个类都是lock实现。读写状态就是其同步器的同步状态,在ReentrantLock中同步状态表示一个锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。
如果在一个整型变量上维护多种状态,就需要”按位切割使用“这个变量,读写锁将这个状态变量分为了高16为和低16位,高16位表示读,低16位表示写。
*
上边的图表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。
读写锁是通过位运算迅速确定读和写的状态:假设当前同步状态位S,写状态等于S&0x0000FFFF(将高16位抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态+1的时候表示S+1,当读状态+1的时候等于S+(1<<16)。
当S不等于0的时候,当写状态(S&0x0000FFFF)等于0,说明读状态大于0,即读锁已经被获取。
**
写锁的获取与释放
获取
写锁最终会调用Sync中的tryAcquire(int acquires)方法
//如果当前线程已经获取了写锁,则增加写状态;//当前线程获取写锁的时候,如果读锁已经被获取(读状态不为0)或者获得写锁的线程是其他线程,则当前线程进入等待状态protectedfinalbooleantryAcquire(int acquires){/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*///得到当前线程
Thread current= Thread.currentThread();//当前锁的状态int c=getState();int w=exclusiveCount(c);if(c!=0){// (Note: if c != 0 and w == 0 then shared count != 0)//c不等于0 并且 w等于0 表示已经存在读锁 或者 当前线程不是已经获取写锁的线程,获取失败if(w==0|| current!=getExclusiveOwnerThread())returnfalse;//超出最大重入写锁范围 (1<<16)-1if(w+exclusiveCount(acquires)> MAX_COUNT)thrownewError("Maximum lock count exceeded");// Reentrant acquiresetState(c+ acquires);returntrue;}//是否需要阻塞if(writerShouldBlock()||!compareAndSetState(c, c+ acquires))returnfalse;//设置获取锁的线程为当前线程setExclusiveOwnerThread(current);returntrue;}
与ReentrantLock中的tryAcquire(int arg)大致一样,在判断重入时增加了对读锁的判断,因为要确保写操作对读锁是可见的,如果在已经获取读锁的情况下允许获取写锁,那么已经获取读锁的线程可能就无法感知当前写线程的操作,因此只有当读锁释放之后,写锁才能被当前线程获取,获取到写锁之后,阻塞其他所有的读、写线程。
释放
WriteLock中提供了unLock()方法,先调用AQS的模板方法,最终调用内部同步器Sync的tryRelease方法。
protectedfinalbooleantryRelease(int releases){if(!isHeldExclusively()){//判断当前释放锁的线程不是已经获得写锁的线程thrownewIllegalMonitorStateException();}int nextc=getState()- releases;//如果写锁的新的线程数为0了,那么就将写锁的持有者设置为null(与ReentrantLock的累减一样,直到把当前线程获得的所有写锁释放完才返回true)boolean free=exclusiveCount(nextc)==0;if(free)setExclusiveOwnerThread(null);setState(nextc);return free;}
读锁的获取与释放
获取
最终也是调用到了Sync的tryAcquireShared(**int **unused)方法中
读锁的获取过程相对于写锁稍微复杂:
- 因为存在锁降级的情况,如果存在写锁,且写锁的持有者不是当前线程,直接返回false
- 依据公平性原则,判断读锁是否需要阻塞,读锁持有的线程数小于最大值(65535),且设置锁状态成功,执行以下代码(HoldCounter后边再说),并返回1。如果不满足条件,执行fullTryAcquireShared
protectedfinalinttryAcquireShared(int unused){/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current= Thread.currentThread();int c=getState();//exclusiveCount(c):计算是否有写锁//getExclusiveOwnerThread() != current:持有写锁的线程不是当前线程//如果持有写锁的线程是当前线程涉及到锁降级if(exclusiveCount(c)!=0&&getExclusiveOwnerThread()!= current)return-1;//获取读锁的状态int r=sharedCount(c);if(!readerShouldBlock()&&//如果不需要被阻塞
r< MAX_COUNT&&//小于最大读锁获取次数compareAndSetState(c, c+ SHARED_UNIT)){if(r==0){//读锁还没有被获取
firstReader= current;
firstReaderHoldCount=1;}elseif(firstReader== current){
firstReaderHoldCount++;}else{
HoldCounter rh= cachedHoldCounter;if(rh== null|| rh.tid!=getThreadId(current))
cachedHoldCounter= rh= readHolds.get();elseif(rh.count==0)
readHolds.set(rh);
rh.count++;}return1;}returnfullTryAcquireShared(current);}
fullTryAcquireShared会根据是否需要等待,获取读锁的次数是否超过上限等进行处理。如果不需要阻塞等待并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。
finalintfullTryAcquireShared(Thread current){/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh= null;for(;;){int c=getState();if(exclusiveCount(c)!=0){if(getExclusiveOwnerThread()!= current)return-1;// else we hold the exclusive lock; blocking here// would cause deadlock.}elseif(readerShouldBlock()){//读锁需要阻塞// Make sure we're not acquiring read lock reentrantlyif(firstReader== current){//当前线程获取到了读锁// assert firstReaderHoldCount > 0;}else{//HoldCounter后面讲解if(rh== null){
rh= cachedHoldCounter;if(rh== null|| rh.tid!=getThreadId(current)){
rh= readHolds.get();if(rh.count==0)
readHolds.remove();}}if(rh.count==0)return-1;}}//读锁超过最大限制if(sharedCount(c)== MAX_COUNT)thrownewError("Maximum lock count exceeded");//如果CAS设置锁状态成功if(compareAndSetState(c, c+ SHARED_UNIT)){//如果是第一次获得读锁if(sharedCount(c)==0){
firstReader= current;
firstReaderHoldCount=1;}elseif(firstReader== current){//如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,则将firstReaderHoldCount+1
firstReaderHoldCount++;}else{if(rh== null)
rh= cachedHoldCounter;if(rh== null|| rh.tid!=getThreadId(current))
rh= readHolds.get();elseif(rh.count==0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter= rh;// cache for release}return1;}}}
释放
读锁的释放最终会调用Sync中的tryReleaseShared(**int **unused)
protectedfinalbooleantryReleaseShared(int unused){
Thread current= Thread.currentThread();if(firstReader== current){//判断第一个获得读锁的线程是否为当前要释放锁的线程// assert firstReaderHoldCount > 0;if(firstReaderHoldCount==1)//如果仅获取了一次,就把firstReader设置为null
firstReader= null;else
firstReaderHoldCount--;//否则计数-1}else{//获取rh对象,并更新“当前线程获取锁的信息”
HoldCounter rh= cachedHoldCounter;if(rh== null|| rh.tid!=getThreadId(current))
rh= readHolds.get();int count= rh.count;if(count<=1){
readHolds.remove();if(count<=0)throwunmatchedUnlockException();}--rh.count;}//CAS更新同步状态for(;;){int c=getState();int nextc= c- SHARED_UNIT;if(compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc==0;}}
HoldCounter
HoldCounter相当于一个计数器,一次共享操作就相当于在该计数器的操作,该计数器用来存储线程获得读锁的次数,但是不包括第一次获得读锁的线程,为了提升效率,Doug lea把第一次获得读锁的线程用来单独保存了,不需要使用HoldCounter;
HoldCounter因为要保存每个线程的信息,因此使用的ThreadLocl来存储
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/staticfinalclassHoldCounter{int count=0;//finallong tid=getThreadId(Thread.currentThread());}/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/staticfinalclassThreadLocalHoldCounterextendsThreadLocal<HoldCounter>{public HoldCounterinitialValue(){returnnewHoldCounter();}}
通过ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程id。但是绑定的线程id不是线程对象的原因是方便GC回收。
锁降级
同一个线程中,在没有释放写锁的情况下,就去申请读锁,这属于锁降级,ReentrantReadWriteLock是支持的.
在获取读锁的代码中可以看到
- 锁降级一定要先获取读锁再释放写锁:获取写锁>获取读锁>释放写锁,为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此时另一个线程获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。
- 如果遵循锁降级的过程,那么其他的写线程就会被阻塞
//exclusiveCount(c):计算是否有写锁//getExclusiveOwnerThread() != current:持有写锁的线程不是当前线程//如果持有写锁的线程是当前线程涉及到锁降级if(exclusiveCount(c)!=0&&getExclusiveOwnerThread()!= current)return-1;
锁升级
同一个线程中,在没有释放读锁的情况下 ,就去申请写锁,这属于锁升级,ReentrantReadWriteLock是不支持的,因为获取到读锁的线程有可能看不到获取到写锁的线程的操作。