博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——ConcurrentLinkedQueue
阅读量:4182 次
发布时间:2019-05-26

本文共 5636 字,大约阅读时间需要 18 分钟。

ConcurrentLinkedQueue 是一个基于链表的无界线程安全队列,队列中的元素遵循 FIFO(先进先出)原则。队列中的头节点(head)是存在时间最久的节点,队列的尾节点(tail)是存在时间最短的节点。获取元素从头节点开始,新增节点追加到尾节点之后。队列中的元素不允许为空。

ConcurrentLinkedQueue 队列实现线程安全没有使用到锁,内部采用循环 CAS 来解决冲突。且内部结构较为复杂,一般往链表后加入节点,tail 指针会马上移动指向最后加入的节点,但是 ConcurrentLinkedQueue 是间隔一个节点以上才设置一次尾节点,减少并发 CAS 设置的冲突,最大限度的提高插入效率。这点和 入队列相似。

先看内部结构

public class ConcurrentLinkedQueue
extends AbstractQueue
implements Queue
, java.io.Serializable {
private static class Node
{
volatile E item; volatile Node
next; ... } private transient volatile Node
head; private transient volatile Node
tail; /** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() { //头和尾都初始化 head = tail = new Node
(null); }}

插入流程草图

在这里插入图片描述

加入一个元素,offer 方法。注意 for 循环的退出条件,一定能保证新节点入队列。

/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */public boolean offer(E e) {
//检查元素不能为空 checkNotNull(e); final Node
newNode = new Node
(e); //从尾部开始遍历,找最终合适的节点 for (Node
t = tail, p = t;;) {
//尾节点一定不为空,获取到当前尾节点的后续节点情况做判断 Node
q = p.next; //如果q为空,说明找到了尾节点,准备插入到之后 if (q == null) {
// p is last node //插入到尾节点之后 if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". //每两次才会修改 tail 尾节点 if (p != t) // hop two nodes at a time //设置尾节点,失败了无所谓 //就算tail和最后一个节点间隔两个以上,之后新节点会重新设置尾节点 casTail(t, newNode); // Failure is OK. //offer方法一定会执行到这里才退出 return true; } //若插入失败,说明存在竞争,p之后有节点了 //再循环一次,q指向p的后驱节点 // Lost CAS race to another thread; re-read next } //如果p=q,说明p是哨兵节点,p被取出了,需要重新遍历 //什么时候会设置哨兵,在后面的 poll 方法中 else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. //判断尾部是否变化,若尾部没有变化,那只能从头再来一遍 //变化了说明有其他线程修改了尾节点,这里可以再次将p指向尾节点后遍历 p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. //因每两次调整尾节点,需要寻找真正的尾节点 //若尾节点变化,还是直接指向尾节点,没变化p指向p.next p = (p != t && t != (t = tail)) ? t : q; }}

看看 offer 方法里面这一行代码

p = (t != (t = tail)) ? t : head;

t != (t = tail) 按照虚拟机的执行顺序:

  • 1.先获取到t的值放入到栈帧中的局部变量表;
  • 2.然后把尾节点的值重新赋值给t变量;
  • 3.再取出局部变量表中的数据做 != 判断。

在高并发的情况下,在步骤1和步骤2中可能会有其他的线程加入新节点调整了tail尾节点,导致步骤3判断返回false。

从队列中取出一个节点,poll 方法

public E poll() {
restartFromHead: for (;;) {
for (Node
h = head, p = h, q;;) {
E item = p.item; //找第一个节点,且第一个节点内元素不为空,CAS取出元素 if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point // for item to be removed from this queue. //和设置尾节点类似,每两次设置一次头节点 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); //返回 return item; } //上面的步骤失败,寻找p的下一个节点 else if ((q = p.next) == null) {
//p后没有节点了,设置p为头节点,并将h设置为哨兵节点 updateHead(h, p); return null; } else if (p == q) //遇到哨兵,重头再来一遍 continue restartFromHead; else //p后还有节点,p指向p.next,向后找 p = q; } }}/** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */final void updateHead(Node
h, Node
p) {
if (h != p && casHead(h, p)) //cas成功之后,就会将头节点的next指针指向自己,设置哨兵 h.lazySetNext(h);}

size 方法获取队列的总长度需要遍历队列

public int size() {
int count = 0; //开始前获取到队列的第一个元素 for (Node
p = first(); p != null; p = succ(p)) //判断第一个元素有没有被取出 if (p.item != null) // Collection.size() spec says to max out //计数加一 if (++count == Integer.MAX_VALUE) break; //取出了执行 p = succ(p) 找下一个元素 return count;}//找第一个元素Node
first() {
restartFromHead: for (;;) {
for (Node
h = head, p = h, q;;) {
//找到的节点是否包含元素 boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) {
updateHead(h, p); //有元素返回p,没有元素且p的后驱也为空返回null不存在手节点 return hasItem ? p : null; } else if (p == q) //遇到哨兵,重头遍历 continue restartFromHead; else //p没有元素,p有后驱节点,p向后移 p = q; } }}/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. *///返回p的后驱,若p为哨兵节点则返回头节点final Node
succ(Node
p) {
Node
next = p.next; return (p == next) ? head : next;}

转载地址:http://yqrai.baihongyu.com/

你可能感兴趣的文章
微信状态视频、图片素材来啦!
查看>>
再见了!锤子!!!
查看>>
LeetCode 全站第一,牛逼!
查看>>
为什么全网都在劝你学Java、Python,而不是C++?
查看>>
卧槽!阿里巴巴《算法笔记》开源了,完整版PDF开放下载!
查看>>
百度的骚操作。。。
查看>>
蔚来,牛X!
查看>>
微信悄悄新出了个野心很大的App
查看>>
微信红包封面制作小程序开放,人人都可免费制作了!!!
查看>>
13000亿!目瞪口呆!
查看>>
腾讯,搞了一个大事!
查看>>
入职腾讯第九年,我辞职了
查看>>
17 张程序员壁纸(使用频率很高)
查看>>
送一台全新手机,手慢无~
查看>>
全球顶级的14位程序员!膜拜!
查看>>
太赛博朋克!华为天才少年自制B站百大Up奖杯,网友:技术难度不高,侮辱性极强...
查看>>
华为正式宣布养猪,网友沸腾:支持华为自救!
查看>>
真的有必要读研究生吗?
查看>>
一个员工的离职成本到底有多恐怖!
查看>>
微软骂人:请勿TM关闭......
查看>>