前言


在阅读完和 AQS 相关的锁以及同步辅助器之后,来一起阅读 JUC 下的和队列相关的源码。先从第一个开始:ArrayBlockingQueue。

介绍

由数组支持的有界BlockingQueue阻塞队列。

这个队列的命令元素FIFO(先入先出)。 队列的头是元素一直在队列中时间最长。 队列的尾部是该元素已经在队列中的时间最短。 新元素插入到队列的尾部,并且队列检索操作获取在队列的头部元素。

这是一个典型的“有界缓冲区”,在其中一个固定大小的数组保持由生产者插入并受到消费者的提取的元素。 一旦创建,容量不能改变。 试图put 一个元素到一个满的队列将导致操作阻塞; 试图 take 从空队列一个元素将类似地阻塞。

此类支持订购等待生产者和消费者线程可选的公平政策。 默认情况下,这个顺序不能保证。 然而,队列公平设置为构建 true 保证线程以FIFO的顺序进行访问。 公平性通常会降低吞吐量,但减少了可变性和避免饥饿。

基本使用


public class ArrayBlockingQueueTest {

private static final ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(10);

private static final CountDownLatch LATCH = new CountDownLatch(2);

public static void main(String[] args) {

ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());


pool.submit(() -> {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(1000L);

QUEUE.put("鸡蛋" + Thread.currentThread().getName());
System.out.println("put 放入元素");
} catch (InterruptedException ignored) {
}
}
LATCH.countDown();
});


pool.submit(() -> {

for (int i = 0; i < 100; i++) {
try {
Thread.sleep(500L);

String take = QUEUE.take();

System.out.println("take = " + take);
} catch (InterruptedException ignored) {
}
}
LATCH.countDown();

});
try {
LATCH.await();
} catch (InterruptedException ignored) {

}
pool.shutdown();
}

}

demo 只是临时写的一个,很简单的版本。

问题疑问

  1. ArrayBlockingQueue 的实现原理是什么?
  2. 入队列和出队列方法之间的区别是什么?

源码分析

基本结构

ArrayBlockingQueue-uml-37BHBp

参数介绍


/** 数组 - 存储队列中的元素 */
final Object[] items;

/** 下一个 take, poll, peek or remove 的索引 */
int takeIndex;

/** 下一个 put, offer, or add 的索引 */
int putIndex;

/** 队列中的元素数 */
int count;


/** Main lock guarding all access */
final ReentrantLock lock;

/** take 操作时是否等待 */
private final Condition notEmpty;

/** put 操作时是否等待 */
private final Condition notFull;

构造函数

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 指定容量,及是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 初始化的时候放入元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

添加元素

public boolean add(E e) {
return super.add(e);
}

// 父类的方法,其实调用的也是 offer
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
// 使用锁
public boolean offer(E e) {
checkNotNull(e);
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 放入元素, 如果队列满了,则等待
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
  1. add 方法:调用的是父类 AbstractQueue 的 add 方法,内部调用的是 offer 方法,如果 offer 返回 false,则抛出异常。
  2. offer 方法:校验元素非空,加互斥锁,如果队列满了,则返回 false,如果队列未满,则调用 enqueue 方法,添加元素。
  3. put 方法:校验元素非空,加互斥锁,如果队列满了,则一直自旋等待,队列未满则调用 enqueue 方法,添加元素。

所以下面还是需要看一下 enqueue 方法:

// 只有在获取锁的时候才可以调用
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// putIndex 下一个 put, offer, or add 的索引
// 对其进行赋值,然后进行 ++putIndex 操作
items[putIndex] = x;
// 如果等于长度,则指定为开始
if (++putIndex == items.length)
putIndex = 0;
// 对元素数进行 ++
count++;
// 有元素入队列,唤醒在等待获取元素的线程
notEmpty.signal();
}

获取元素

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

通过源码可以看出:

  1. pool 和 take 都是从队列中获取元素,二者不同的是,当队列中没有元素时,poll 方法返回 null,而 take 方法会一直阻塞等待,直到从队列中获取到元素。
  2. poll 和 take 方法获取元素都是调用的 dequeue 方法。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取元素并将元素置为 null
E x = (E) items[takeIndex];
items[takeIndex] = null;
// takeIndex 下一个 take, poll, peek or remove 的索引
// 指向下一个元素,并且 元素数减少
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 更新迭代器状态
if (itrs != null)
itrs.elementDequeued();
// 唤醒等待放入元素的线程
notFull.signal();
return x;
}

查看元素

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

总结

Q&A

Q: ArrayBlockingQueue 的实现原理?

A: ArrayBlockingQueue 是基于数组实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。

Q: 入队列和出队列方法之间的区别是什么?

方法 作用
add 添加元素,队列满了,添加失败抛出遗产
offer 添加元素, 队列满了,添加失败,返回 false
put 添加元素,队列满了,阻塞等待
poll 弹出元素,队列为空则返回 null
take 弹出元素,队列为空则等待队列中有元素
peek 查看队列中放入最早的一个元素

结束语

ArrayBlockingQueue 中使用了 ReentrantLock 互斥锁,在元素入队列和出队列的时候都进行了加锁,所以同时只会有一个线程进行入队列或者出队列,从而保证线程安全。