正常不支持并发的队列,这里的实现是头尾节点初始状态都是指向一个哑结点,方便处理队列为空这一边界情况
public class Queue<T> {
private Node<T> head;
private Node<T> tail;
public Queue() {
head = tail = new Node<>(null);
}
public void enq(T e) {
Node<T> newNode = new Node<>(e);
tail.next = newNode;
tail = newNode;
}
public T deq() {
Node<T> first = head.next;
T value = first.value;
first.value = null;
head = first;
return value;
}
private static class Node<T> {
public Node<T> next;
public T value;
Node(T value) {
this.value = value;
}
}
}
首先图解一下出入队
下图为初始队列为空的入队情况
图中可以分为三个状态,初始状态->中间状态->完成状态
- 初始状态:head与tail都指向哑节点D
- 中间状态:tail.next = newNode 将尾指针的next指向新节点
- 完成状态:tail = newNode 将尾指针指向新节点
下图为队列不为空的入队情况,与上图区别仅在于初始状态的不同
下图为队列不为空的出队情况
同样分为三个状态,初始状态->中间状态->完成状态
- 初始状态:
- 中间状态:head = first 将哑结点的next指向头结点,并将头结点设为哑节点
- 完成状态:将头结点设为哑节点,return 返回出队元素的值
了解了普通队列,下面开始构造无锁队列,这里的实现主要基于John D. Valois 的 Implementing Lock-Free Queues 这篇论文[2]
public class LockFreeQueue<T> {
private AtomicReference<Node<T>> head;
private AtomicReference<Node<T>> tail;
public LockFreeQueue() {
Node<T> dummy = new Node<>(null);
head = new AtomicReference<>(dummy);
tail = new AtomicReference<>(dummy);
}
public void enq(T e) {}
public T deq() throws Exception {}
private static class Node<T> {
public AtomicReference<Node<T>> next = new AtomicReference<>(null);
public T value;
Node(T value) {
this.value = value;
}
}
首先按照普通的队列,先把头尾指针和Node定义好,为了能够使用CAS,使用了AtomicReference。然后考虑无锁的出队入队实现。
先来进行入队的无锁实现,首先我们确认入队操作的步骤有2个:
- 将尾节点的next指向新节点
- 将尾节点指向新节点
那么我们先回顾入队的步骤(如图),写出如下代码
public void enq(T e) {
Node<T> newNode = new Node<>(e);
for(;;) {
// 按照CAS实现无锁的一般写法
// 我们首先获取尾节点及其next指针的快照
Node<T> last = tail.get();
Node<T> next = last.next.get();
// CAS设置新节点置尾节点的next域
if (last.next.compareAndSet(null, newNode)) {
// 更新成功后,此时队列变成入队的中间状态
// 并且我们注意到中间状态下,其他入队线程执行CAS都将失败
// 本线程可以直接将尾指针指向新节点并返回
tail.set(newNode);
return;
}
}
}
上述实现看起来挺简单,并且并发下的正确性是能够保证的,那么这样实现的入队是否是无锁的呢
首先无锁算法必须是无干扰的,无干扰意味着如果某个方法调用在多个线程之间存在竞争,将这些线程挂起,然后随意唤醒一个线程,该线程能够不受其他线程状态的干扰而完成方法调用。
显然,在上述实现中,如果CAS成功的线程因为某种原因(比如挂了)并没有将尾节点指向新节点,那么队列将一直处于中间状态,其他入队线程将一直CAS失败,从而被阻塞。
如何将上述实现改进成无锁的?那么需要引入其他入队线程的”帮助”
public void enq(T e) {
Node<T> newNode = new Node<>(e);
for(;;) {
Node<T> last = tail.get();
Node<T> next = last.next.get();
// next != null 意味着队列已经处于中间状态
if (next == null) {
if (last.next.compareAndSet(null, newNode)) {
// CAS设置尾节点指向新节点
tail.compareAndSet(last, newNode);
return;
}
} else {
// "帮助"CAS成功的线程将尾节点指向新节点
tail.compareAndSet(last, next);
}
}
}
我们可以看到处于中间状态的队列,所有入队线程,都会尝试将尾节点指向新节点,即使失败也无所谓,因为能够保证总有一个线程能成功
这样一来,我们就真正实现了无锁的入队
现在再来实现无锁的出队
public T deq() throws Exception {
for(;;) {
// 取得头指针及首节点(哑节点的next)的快照
Node<T> dummy = head.get();
Node<T> first = dummy.next.get();
// 判断队列空的情况
if (first == null) {
// 队列空,返回null或者抛出异常
return null;
}
T value = first.value;
// CAS 设置头指针指向首节点
if (head.compareAndSet(dummy, first)) {
// 将首节点置为哑节点,并返回值
first.value = null;
return value;
}
}
}
可以看出这样的入队操作是无锁的。
同时我们能够注意到出队与入队操作分别仅依赖于尾指针与头指针(得益于基于哑结点的实现方式),因此入队与出队操作并不会互相影响。
至于这样实现是否会有ABA问题,这里我照搬JUC中ConcurrentLinkedQueue中的注释
Note that like most non-blocking algorithms in this package,this implementation relies on the fact that in garbage collected systems, there is no possibility of ABA problems due to recycled nodes, so there is no need to use “counted pointers” or related techniques seen in versions used in non-GC’ed settings.
这里有2个要点:
- ABA problems due to recycled nodes 因为复用节点导致的ABA问题
- this implementation relies on the fact that in garbage collected systems 该无锁算法是在垃圾回收机制下实现的
首先我们不会存在复用节点(Node)的情况,其次在java的垃圾回收机制下,两个引用如果不为null且相等,其必然指向同一个对象,因此在这个无锁队列中,一个新创建的节点(的引用)是不会与已有的节点(的引用)具有相同地址的
References
- [1] Herlihy, M. and Shavit, N. (n.d.). The art of multiprocessor programming.
- [2] J. D. Valois. Implementing Lock-Free Queues. In Proceedings of the SeventhInternational Conference on Parallel and Distributed Computing Systems, Las Vegas, NV, 1994.