源码笔记JDKJDK源码笔记【JDK源码笔记】- 基于数组的有界阻塞队列 —— ArrayBlockingQueue
liuzhihang
前言
在阅读完和 AQS 相关的锁以及同步辅助器之后,来一起阅读 JUC 下的和队列相关的源码。先从第一个开始:ArrayBlockingQueue。
介绍
由数组支持的有界BlockingQueue阻塞队列。
这个队列的命令元素FIFO(先入先出)。 队列的头是元素一直在队列中时间最长。 队列的尾部是该元素已经在队列中的时间最短。 新元素插入到队列的尾部,并且队列检索操作获取在队列的头部元素。
这是一个典型的“有界缓冲区”,在其中一个固定大小的数组保持由生产者插入并受到消费者的提取的元素。 一旦创建,容量不能改变。 试图put 一个元素到一个满的队列将导致操作阻塞; 试图 take 从空队列一个元素将类似地阻塞。
此类支持订购等待生产者和消费者线程可选的公平政策。 默认情况下,这个顺序不能保证。 然而,队列公平设置为构建 true 保证线程以FIFO的顺序进行访问。 公平性通常会降低吞吐量,但减少了可变性和避免饥饿。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class ArrayBlockingQueueTest {
private static final ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(10);
private static final CountDownLatch LATCH = new CountDownLatch(2);
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
pool.submit(() -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(1000L);
QUEUE.put("鸡蛋" + Thread.currentThread().getName()); System.out.println("put 放入元素"); } catch (InterruptedException ignored) { } } LATCH.countDown(); });
pool.submit(() -> {
for (int i = 0; i < 100; i++) { try { Thread.sleep(500L);
String take = QUEUE.take();
System.out.println("take = " + take); } catch (InterruptedException ignored) { } } LATCH.countDown();
}); try { LATCH.await(); } catch (InterruptedException ignored) {
} pool.shutdown(); }
}
|
demo 只是临时写的一个,很简单的版本。
问题疑问
- ArrayBlockingQueue 的实现原理是什么?
- 入队列和出队列方法之间的区别是什么?
源码分析
基本结构

参数介绍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
|
构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public ArrayBlockingQueue(int capacity) { this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair);
final ReentrantLock lock = this.lock; lock.lock(); try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
|
添加元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public boolean add(E e) { return super.add(e); }
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
- add 方法:调用的是父类 AbstractQueue 的 add 方法,内部调用的是 offer 方法,如果 offer 返回 false,则抛出异常。
- offer 方法:校验元素非空,加互斥锁,如果队列满了,则返回 false,如果队列未满,则调用 enqueue 方法,添加元素。
- put 方法:校验元素非空,加互斥锁,如果队列满了,则一直自旋等待,队列未满则调用 enqueue 方法,添加元素。
所以下面还是需要看一下 enqueue 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
|
获取元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
通过源码可以看出:
- pool 和 take 都是从队列中获取元素,二者不同的是,当队列中没有元素时,poll 方法返回 null,而 take 方法会一直阻塞等待,直到从队列中获取到元素。
- poll 和 take 方法获取元素都是调用的 dequeue 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
|
查看元素
1 2 3 4 5 6 7 8 9
| public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
|
总结
Q&A
Q: ArrayBlockingQueue 的实现原理?
A: ArrayBlockingQueue 是基于数组实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。
Q: 入队列和出队列方法之间的区别是什么?
方法 |
作用 |
add |
添加元素,队列满了,添加失败抛出遗产 |
offer |
添加元素, 队列满了,添加失败,返回 false |
put |
添加元素,队列满了,阻塞等待 |
|
|
poll |
弹出元素,队列为空则返回 null |
take |
弹出元素,队列为空则等待队列中有元素 |
|
|
peek |
查看队列中放入最早的一个元素 |
结束语
ArrayBlockingQueue 中使用了 ReentrantLock 互斥锁,在元素入队列和出队列的时候都进行了加锁,所以同时只会有一个线程进行入队列或者出队列,从而保证线程安全。