源码解析Synchronous Queue 这种特立独行的队列

2022年5月13日09:28:11

摘要:Synchronous Queue 是一种特立独行的队列,其本身是没有容量的,比如调用者放一个数据到队列中,调用者是不能够立马返回的,调用者必须等待别人把我放进去的数据消费掉了,才能够返回。

本文分享自华为云社区《Synchronous Queue 源码解析》,作者: JavaEdge。

1 简介

Synchronous Queue 是一种特立独行的队列,其本身是没有容量的,比如调用者放一个数据到队列中,调用者是不能够立马返回的,调用者必须等待别人把我放进去的数据消费掉了,才能够返回。Synchronous Queue 在 MQ 中被大量使用,本文就让我们从源码来看下 Synchronous Queue 到底是如何实现这种功能的呢。

2 整体架构

不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列使用AQS实现并发,SynchronousQueue直接使用CAS操作实现数据的安全访问,因此源码中充斥着大量的CAS代码。

SynchronousQueue 的整体设计比较抽象,在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈,两种算法被两个内部类实现,而直接对外的 put,take 方法的实现就非常简单,都是直接调用两个内部类的 transfer 方法进行实现,整体的调用关系如下图所示:

源码解析Synchronous Queue 这种特立独行的队列

2.1 类注释

队列不存储数据,所以没有大小,也无法迭代;插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然;

队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。

第二点是如何做到的?堆栈又是如何实现的呢?接下来我们一点一点揭晓。

2.2 类图

源码解析Synchronous Queue 这种特立独行的队列

SynchronousQueue 整体类图和 LinkedBlockingQueue 相似,都实现了 BlockingQueue 接口,但因为其不储存数据结构,有一些方法是没有实现的,比如说 isEmpty、size、contains、remove 和迭代等方法,这些方法都是默认实现,如下截图:

2.3 结构细节

SynchronousQueue 底层结构和其它队列完全不同,有着独特的两种数据结构:队列和堆栈,我们一起来看下数据结构:

// 堆栈和队列共同的接口// 负责执行 put or takeabstractstaticclass Transferer<E> {// e 为空的,会直接返回特殊值,不为空会传递给消费者// timed 为 true,说明会有超时时间abstract E transfer(E e, boolean timed,long nanos);
}// 堆栈 后入先出 非公平// Scherer-Scott 算法static finalclass TransferStack<E> extends Transferer<E> {
}// 队列 先入先出 公平static finalclass TransferQueue<E> extends Transferer<E> {
}private transientvolatile Transferer<E> transferer;// 无参构造器默认为非公平的public SynchronousQueue(boolean fair) {
    transferer= fair ?new TransferQueue<E>() :new TransferStack<E>();
}

从源码中我们可以得到几点:

堆栈和队列都有一个共同的接口,叫做 Transferer,该接口有个方法:transfer,该方法很神奇,会承担 take 和 put 的双重功能;

在我们初始化的时候,是可以选择是使用堆栈还是队列的,如果你不选择,默认的就是堆栈,类注释中也说明了这一点,堆栈的效率比队列更高。

接下来我们来看下堆栈和队列的具体实现。

3 非公平的堆栈

3.1 堆栈的结构

首先我们来介绍下堆栈的整体结构,如下:

源码解析Synchronous Queue 这种特立独行的队列

从上图中我们可以看到,我们有一个大的堆栈池,池的开口叫做堆栈头,put 的时候,就往堆栈池中放数据。take 的时候,就从堆栈池中拿数据,两者操作都是在堆栈头上操作数据,从图中可以看到,越靠近堆栈头,数据越新,所以每次 take 的时候,都会拿到堆栈头的最新数据,这就是我们说的后入先出,也就是非公平的。

图中 SNode 就是源码中栈元素的表示,我们看下源码:

源码解析Synchronous Queue 这种特立独行的队列

  • volatile SNode next
    栈的下一个,就是被当前栈压在下面的栈元素
  • volatile SNode match
    节点匹配,用来判断阻塞栈元素能被唤醒的时机
  • 比如我们先执行 take,此时队列中没有数据,take 被阻塞了,栈元素为 SNode1
    当有 put 操作时,会把当前 put 的栈元素赋值给 SNode1 的 match 属性,并唤醒 take 操作
    当 take 被唤醒,发现 SNode1 的 match 属性有值时,就能拿到 put 进来的数据,从而返回
  • volatile Thread waiter
    栈元素的阻塞是通过线程阻塞来实现的,waiter 为阻塞的线程
  • Object item
    未投递的消息,或者未消费的消息

3.2 入栈和出栈

  • 入栈
    使用 put 等方法,将数据放到堆栈池中
  • 出栈
    使用 take 等方法,把数据从堆栈池中拿出来

操作的对象都是堆栈头,虽然两者的一个是从堆栈头拿数据,一个是放数据,但底层实现的方法却是同一个,源码如下:

transfer 方法思路比较复杂,因为 take 和 put 两个方法都揉在了一起A

@SuppressWarnings("unchecked")
E transfer(E e, boolean timed,long nanos) {
    SNode s=null;// constructed/reused as needed// e 为空: take 方法,非空: put 方法int mode = (e ==null) ? REQUEST : DATA;// 自旋for (;;) {// 头节点情况分类// 1:为空,说明队列中还没有数据// 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据// 3:非空,并且是 put 类型的,说明头节点线程正等着放数据
        SNode h = head;// 栈头为空,说明队列中还没有数据。// 栈头非空且栈头的类型和本次操作一致//    比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行if (h ==null || h.mode == mode) {// empty or same-mode// 设置了超时时间,并且 e 进栈或者出栈要超时了,// 就会丢弃本次操作,返回 null 值。// 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费if (timed && nanos <=0) {// 无法等待// 栈头操作被取消if (h !=null && h.isCancelled())// 丢弃栈头,把栈头的后一个元素作为栈头
                    casHead(h, h.next);// 将取消的节点弹栈// 栈头为空,直接返回 nullelsereturnnull;// 没有超时,直接把 e 作为新的栈头
            }elseif (casHead(h, s = snode(s, e, h, mode))) {// e 等待出栈,一种是空队列 take,一种是 put
                SNode m = awaitFulfill(s, timed, nanos);if (m == s) {// wait was cancelled                    clean(s);returnnull;
                }// 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头if ((h = head) !=null && h.next == s)
                    casHead(h, s.next);// help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);
            }// 栈头正在等待其他线程 put 或 take// 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
        }elseif (!isFulfilling(h.mode)) {// try to fulfill// 栈头已经被取消,把下一个元素作为栈头if (h.isCancelled())// already cancelled
                casHead(h, h.next);// pop and retry// snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) {// loop until matched or waiters disappear// m 就是栈头,通过上面 snode 方法刚刚赋值
                    SNode m = s.next;// m is s's matchif (m ==null) {// all waiters are gone
                        casHead(s,null);// pop fulfill node
                        s =null;// use new node next timebreak;// restart main loop                    }
                    SNode mn= m.next;// tryMatch 非常重要的方法,两个作用:// 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性// 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s// 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据if (m.tryMatch(s)) {
                        casHead(s, mn);// pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);
                    }else// lost match
                        s.casNext(m, mn);// help unlink                }
            }
        }else {// help a fulfiller
            SNode m = h.next;// m is h's matchif (m ==null)// waiter is gone
                casHead(h,null);// pop fulfilling nodeelse {
                SNode mn= m.next;if (m.tryMatch(h))// help match
                    casHead(h, mn);// pop both h and melse// lost match
                    h.casNext(m, mn);// help unlink            }
        }
    }
}

总结一下操作思路:

  1. 判断是 put 方法还是 take 方法
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
  5. 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

在整个过程中,有一个节点阻塞的方法,源码如下:

当一个 节点/线程 将要阻塞时,它会设置其 waiter 字段,然后在真正 park 之前至少再检查一次状态,从而涵盖了竞争与实现者的关系,并注意到 waiter 非空,因此应将其唤醒。

当由出现在调用点位于堆栈顶部的节点调用时,对停放的调用之前会进行旋转,以避免在生产者和消费者及时到达时阻塞。 这可能只足以在多处理器上发生。

从主循环返回的检查顺序反映了这样一个事实,即优先级: 中断 > 正常的返回 > 超时。 (因此,在超时时,在放弃之前要进行最后一次匹配检查。)除了来自非定时SynchronousQueue的调用。{poll / offer}不会检查中断,根本不等待,因此陷入了转移方法中 而不是调用awaitFulfill。

/**
 * 旋转/阻止,直到节点s通过执行操作匹配。
 * @param s 等待的节点
 * @param timed true if timed wait
 * @param nanos 超时时间
 * @return 匹配的节点, 或者是 s 如果被取消*/
SNode awaitFulfill(SNode s, boolean timed,long nanos) {// deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0
    finallong deadline = timed ? System.nanoTime() + nanos :0L;
    Thread w= Thread.currentThread();// 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。// 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据// 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去int spins = (shouldSpin(s) ?
                 (timed? maxTimedSpins : maxUntimedSpins) :0);for (;;) {// 当前线程有无被打断,如果过了超时时间,当前线程就会被打断if (w.isInterrupted())
            s.tryCancel();

        SNode m= s.match;if (m !=null)return m;if (timed) {
            nanos= deadline - System.nanoTime();// 超时了,取消当前线程的等待操作if (nanos <=0L) {
                s.tryCancel();continue;
            }
        }// 自选次数减1if (spins >0)
            spins= shouldSpin(s) ? (spins-1) :0;// 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒elseif (s.waiter ==null)
            s.waiter= w;// establish waiter so can park next iterelseif (!timed)// 通过 park 进行阻塞,这个我们在锁章节中会说明
            LockSupport.park(this);elseif (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

可以发现其阻塞策略,并不是一上来就阻塞住,而是在自旋一定次数后,仍然没有其它线程来满足自己的要求时,才会真正的阻塞。

队列的实现策略通常分为公平模式和非公平模式,本文我们重点介绍公平模式。

4 公平队列

4.1元素组成

源码解析Synchronous Queue 这种特立独行的队列源码解析Synchronous Queue 这种特立独行的队列

  • volatile QNode next
    当前元素的下一个元素
  • volatile Object item // CAS’ed to or from null
    当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
  • volatile Thread waiter // to control park/unpark
    可以阻塞住的当前线程
  • final boolean isData
    true 是 put,false 是 take

公平队列主要使用的是 TransferQueue 内部类的 transfer 方法,看源码:

E transfer(E e, boolean timed,long nanos) {

    QNode s=null;// constructed/reused as needed// true : put false : get
    boolean isData = (e !=null);for (;;) {// 队列头和尾的临时变量,队列是空的时候,t=h
        QNode t = tail;
        QNode h= head;// tail 和 head 没有初始化时,无限循环// 虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况// 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了if (t ==null || h ==null)continue;// 首尾节点相同,说明是空队列// 或者尾节点的操作和当前节点操作一致if (h == t || t.isData == isData) {
            QNode tn= t.next;// 当 t 不是 tail 时,说明 tail 已经被修改过了// 因为 tail 没有被修改的情况下,t 和 tail 必然相等// 因为前面刚刚执行赋值操作: t = tailif (t != tail)continue;// 队尾后面的值还不为空,t 还不是队尾,直接把 tn 赋值给 t,这是一步加强校验。if (tn !=null) {
                advanceTail(t, tn);continue;
            }//超时直接返回 nullif (timed && nanos <=0)// can't waitreturnnull;//构造node节点if (s ==null)
                s=new QNode(e, isData);//如果把 e 放到队尾失败,继续递归放进去if (!t.casNext(null, s))// failed to link incontinue;

            advanceTail(t, s);// swing tail and wait// 阻塞住自己
            Object x = awaitFulfill(s, e, timed, nanos);if (x == s) {// wait was cancelled                clean(t, s);returnnull;
            }if (!s.isOffList()) {// not already unlinked
                advanceHead(t, s);// unlink if headif (x !=null)// and forget fields
                    s.item = s;
                s.waiter=null;
            }return (x !=null) ? (E)x : e;// 队列不为空,并且当前操作和队尾不一致// 也就是说当前操作是队尾是对应的操作// 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put
        }else {// complementary-mode// 如果是第一次执行,此处的 m 代表就是 tail// 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作
            QNode m = h.next;// node to fulfillif (t != tail || m ==null || h != head)continue;// inconsistent read
            Object x= m.item;if (isData == (x !=null) ||// m already fulfilled
                x == m ||// m cancelled// m 代表栈头// 这里把当前的操作值赋值给阻塞住的 m 的 item 属性// 这样 m 被释放时,就可得到此次操作的值
                !m.casItem(x, e)) {// lost CAS
                advanceHead(h, m);// dequeue and retrycontinue;
            }// 当前操作放到队头
            advanceHead(h, m);// successfully fulfilled// 释放队头阻塞节点            LockSupport.unpark(m.waiter);return (x !=null) ? (E)x : e;
        }
    }
}

线程被阻塞住后,当前线程是如何把自己的数据传给阻塞线程的。

假设线程 1 从队列中 take 数据 ,被阻塞,变成阻塞线程 A 然后线程 2 开始往队列中 put 数据 B,大致的流程如下:

  • 线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A
  • 线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
  • 线程 1 被唤醒后,就能从 A.item 里面拿到线程 2 put 的数据了,线程 1 成功返回。

在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从堆头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。

4.2 图解公平队列模型

公平模式下,底层实现使用的是 TransferQueue 队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。

初始化时,TransferQueue的状态如下:

源码解析Synchronous Queue 这种特立独行的队列

1.线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:

源码解析Synchronous Queue 这种特立独行的队列

2.接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:

源码解析Synchronous Queue 这种特立独行的队列

3.这时候,来了一个线程take1,执行了 take操作,由于tail指向put2线程,put2线程跟take1线程配对了(一put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put2,而是put1。

为何? 大家应该知道我们现在讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,我们的例子明显是put1应该优先被唤醒。有的同学可能会有一个疑问,明明是take1线程跟put2线程匹配上了,结果是put1线程被唤醒消费,怎么确保take1线程一定可以和次首节点(head.next)也是匹配的呢?其实大家可以拿个纸画一画,就会发现真的就是这样的。

公平策略总结下来就是:队尾匹配队头出队。

执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:

源码解析Synchronous Queue 这种特立独行的队列

4.最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:

源码解析Synchronous Queue 这种特立独行的队列

以上便是公平模式下,SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。

5 非公平模式

5.1 元素组成

  • 栈顶

源码解析Synchronous Queue 这种特立独行的队列源码解析Synchronous Queue 这种特立独行的队列

  • volatile SNode next
    栈中的下一个元素
  • volatile Object item // data; or null for REQUESTs
    当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
  • volatile Thread waiter
    可以阻塞住的当前线程

5.2 图解非公平模型

还是使用跟公平模式下一样的操作流程,对比两种策略下有何不同。

非公平模式底层的实现使用的是TransferStack,一个栈,实现中用head指针指向栈顶,接着我们看看它的实现模型:

1.线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待,这时栈状态如下

源码解析Synchronous Queue 这种特立独行的队列

2.接着,线程put2再次执行了put(2)操作,跟前面一样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态如下:

源码解析Synchronous Queue 这种特立独行的队列

3.这时候,来了一个线程take1,执行了take操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程

源码解析Synchronous Queue 这种特立独行的队列

4.最后,再来一个线程take2,执行take操作,这跟步骤3的逻辑基本是一致的,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示:

源码解析Synchronous Queue 这种特立独行的队列

可以从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平的由来。

5 总结

SynchronousQueue 源码比较复杂,建议大家进行源码的 debug 来学习源码,为大家准备了调试类:SynchronousQueueDemo,大家可以下载源码自己调试一下,这样学起来应该会更加轻松一点。

  • SynchronousQueue内没有容器为什么能够存储一个元素?
    内部没有容器指的是没有像数组那样的内存空间存多个元素,但是是有单地址内存空间,用于交换数据

SynchronousQueue由于其独有的线程一一配对通信机制,在大部分平常开发中,可能都不太会用到,但线程池技术中会有所使用,由于内部没有使用AQS,而是直接使用CAS,所以代码理解起来会比较困难,但这并不妨碍我们理解底层的实现模型,在理解了模型的基础上,再翻阅源码,就会有方向感,看起来也会比较容易!

  • 作者:华为云开发者社区
  • 原文链接:https://www.cnblogs.com/huaweiyun/p/16194676.html
    更新时间:2022年5月13日09:28:11 ,共 9600 字。