前言
上一节看了基于数据的有界阻塞队列 ArrayBlockingQueue 的源码,通过阅读源码了解到在 ArrayBlockingQueue 中入队列和出队列操作都是用了 ReentrantLock 来保证线程安全。下面咱们看另一种有界阻塞队列:LinkedBlockingQueue。
介绍
一个基于链接节点的,可选绑定的 BlockingQueue 阻塞队列。
对元素 FIFO(先进先出)进行排序。队列的头部是已在队列中停留最长时间的元素。队列的尾部是最短时间出现在队列中的元素。将新元素插入队列的尾部,并检索队列操作获取队列开头的元素。
基于连表的队列通常具有比基于数组的队列有更高的吞吐量,但是大多数并发应用程序中的可预测性较差。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class LinkedBlockingQueueTest {
private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
QUEUE.put("put 入队列, 队列满则会阻塞等待");
QUEUE.add("add 入队列, 队列满则会抛出异常");
QUEUE.offer("offer 入队列, 队列满会返回 false");
String poll = QUEUE.poll();
String take = QUEUE.take();
String peek = QUEUE.peek();
}
}
|
问题疑问
- LinkedBlockingQueue 的实现原理是什么?
- LinkedBlockingQueue 和 ArrayBlockingQueue 的区别是什么?
源码分析
基本结构

参数介绍
1 2 3 4 5 6 7 8 9 10 11 12 13
| static class Node<E> { E item;
Node<E> next;
Node(E x) { item = x; } }
|
首先在 LinkedBlockingQueue 中有一个静态内部类 Node 支持泛型,下面看下其他字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
|
构造函数
1 2 3 4 5 6 7 8 9 10 11
| public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
|
通过构造函数可以看出,在初始化 LinkedBlockingQueue 时,如果不传入容量则会默认指定 Integer.MAX_VALUE。
添加元素
add 方法是直接调用的父类 AbstractQueue 的方法,内部调用的 LinkedBlockingQueue 自己实现的 offer 方法
1 2 3 4 5 6
| public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
|
主要阅读的还是 LinkedBlockingQueue 的 put 和 offer 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
|
通过上面两段代码可以看出 put 和 offer 的最大区别在于是否阻塞。 put 方法当队列达到指定容量时,会阻塞,等待有元素出队列。而 offer 方法会直接返回 false。
同时两个方法操作元素入队列都是调用的 enqueue(node) 方法,下面一起看下 enqueue 方法。
1 2 3 4 5
| private void enqueue(Node<E> node) { last = last.next = node; }
|
在 enqueue 方法中,直接指定当前尾节点的 next 为传入的元素即可。
获取元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
通过上面代码可以看出 poll 和 take 方法逻辑大致相同。区别就是在当前队列为空时的处理逻辑。poll 在当前队列为空时返回 null,take 会阻塞等待,知道当前队列中有元素。
poll 和 take 都试用 dequeue() 方法从队列中获取元素。
1 2 3 4 5 6 7 8 9 10 11
| private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; }
|
dequeue() 方法逻辑就是获取头节点,并将 head 指向下一个节点。
查看元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
|
peek() 方法比较简单,直接获取 head 的元素值即可。
总结
Q&A
Q: LinkedBlockingQueue 的实现原理?
A: LinkedBlockingQueue 是基于链表实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。
- take、poll、peek 等从队列中获取元素的操作共用 takeLock 锁。
- add、put、offer 等向队列中添加元素的操作共同 putLock 锁。
- notEmpty 和 notFull 是 Condition 类型,在 take 和 put 操作时,如果如果队列为空或者队列已满,会调用相应的 await 将线程放入条件队列。
Q: 入队列和出队列方法之间的区别是什么?
方法 |
作用 |
add |
添加元素,队列满了,添加失败抛出遗产 |
offer |
添加元素, 队列满了,添加失败,返回 false |
put |
添加元素,队列满了,阻塞等待 |
|
|
poll |
弹出元素,队列为空则返回 null |
take |
弹出元素,队列为空则等待队列中有元素 |
|
|
peek |
查看队列中放入最早的一个元素 |
结束语
LinkedBlockingQueue 使用和 ArrayBlockingQueue 并没有什么区别,内部实现都是使用的 ReentrantLock,可以对照着阅读。同时 Condition 这块也需要着重了解一下。