前言


在介绍 AQS 时,其中有一个内部类叫做 ConditionObject,当时并没有进行介绍,并且在后续阅读源码时,会发现很多地方用到了 Condition ,这时就会很诧异,这个 Condition 到底有什么作用?那今天就通过阅读 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然阅读这篇文章的时候希望你已经阅读了 AQS、ReentrantLock 以及 LockSupport 的相关文章或者有一定的了解(当然小伙伴也可以直接跳到文末看总结)。

介绍

Object 的监视器方法:wait、notify、notifyAll 应该都不陌生,在多线程使用场景下,必须先使用 synchronized 获取到锁,然后才可以调用 Object 的 wait、notify。

Condition 的使用,相当于用 Lock 替换了 synchronized,然后用 Condition 替换 Object 的监视器方法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(等待),直到另一线程通知被阻塞的线程,某些状态条件现在可能为真。

因为访问到此共享状态信息发生在不同的线程中,因此必须对其进行保护,所以会使用某种形式的锁。等待条件提供的关键属性是它以原子地释放了关联的锁,并且挂起当前线程,就像 Object.wait 一样。

Condition 实例本质上要绑定到锁。 为了获得 Condition 实例,一般使用 Lock 实例的 newCondition() 方法。

1
2
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BoundedBuffer {

final Lock lock = new ReentrantLock();
// condition 实例依赖于 lock 实例
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putPtr, takePtr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
// put 时判断是否已经满了
// 则线程在 notFull 条件上排队阻塞
while (count == items.length) {
notFull.await();
}
items[putPtr] = x;
if (++putPtr == items.length) {
putPtr = 0;
}
++count;
// put 成功之后,队列中有元素
// 唤醒在 notEmpty 条件上排队阻塞的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
// take 时,发现为空
// 则线程在 notEmpty 的条件上排队阻塞
while (count == 0) {
notEmpty.await();
}
Object x = items[takePtr];
if (++takePtr == items.length) {
takePtr = 0;
}
--count;
// take 成功,队列不可能是满的
// 唤醒在 notFull 条件上排队阻塞的线程
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

上面是官方文档的一个例子,实现了一个简单的 BlockingQueue ,看懂这里,会发现在同步队列中很多地方都是用的这个逻辑。必要的代码说明都已经在代码中进行注释。

问题疑问

  1. Condition 和 AQS 有什么关系?
  2. Condition 的实现原理是什么?
  3. Condition 的等待队列和 AQS 的同步队列有什么区别和联系?

源码分析

基本结构

condition-uml-rMKuf3

通过 UML 可以看出,Condition 只是一个抽象类,它的主要实现逻辑是在 AQS 的内部类 ConditionObject 实现的。下面主要从 await 和 signal 两个方法入手,从源码了解 ConditionObject。

创建 Condition

1
2
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

一般使用 lock.newCondition() 创建条件变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ReentrantLock implements Lock, java.io.Serializable {

private final Sync sync;

public Condition newCondition() {
return sync.newCondition();
}
// Sync 集成 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {

final ConditionObject newCondition() {
return new ConditionObject();
}
}
}

这里使用的是 ReentrantLock 的源码,里面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创建了一个 AQS 内部类的 ConditionObject 的实例。

这里需要注意的是 lock 每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 可以创建多个 Condition 实例。

Condition 参数

1
2
3
4
/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;

await 方法

await 方法,会造成当前线程在等待,直到收到信号或被中断。

与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:

  1. 其他一些线程调用此 Condition 的 signal 方法,而当前线程恰好被选择为要唤醒的线程;
  2. 其他一些线程调用此 Condition 的 signalAll 方法;
  3. 其他一些线程中断当前线程,并支持中断线程挂起;
  4. 发生虚假唤醒。

在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,可以保证保持此锁。

现在来看 AQS 内部的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 添加到条件队列尾部(等待队列)
// 内部会创建 Node.CONDITION 类型的 Node
Node node = addConditionWaiter();
// 释放当前线程获取的锁(通过操作 state 的值)
// 释放了锁就会被阻塞挂起
int savedState = fullyRelease(node);
int interruptMode = 0;
// 节点已经不在同步队列中,则调用 park 让其在等待队列中挂着
while (!isOnSyncQueue(node)) {
// 调用 park 阻塞挂起当前线程
LockSupport.park(this);
// 说明 signal 被调用了或者线程被中断,校验下唤醒原因
// 如果因为终端被唤醒,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// while 循环结束, 线程开始抢锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 统一处理中断的
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

await 方法步骤如下:

  1. 创建 Node.CONDITION 类型的 Node 并添加到条件队列(ConditionQueue)的尾部;
  2. 释放当前线程获取的锁(通过操作 state 的值)
  3. 判断当前线程是否在同步队列(SyncQueue)中,不在的话会使用 park 挂起。
  4. 循环结束之后,说明已经已经在同步队列(SyncQueue)中了,后面等待获取到锁,继续执行即可。

在这里一定要把条件队列和同步队列进行区分清楚!!

条件队列/等待队列:即 Condition 的队列
同步队列:AQS 的队列。

下面对 await 里面重要方法进行阅读:

  • addConditionWaiter() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 判断尾节点状态,如果被取消,则清除所有被取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建新节点,类型为 Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将新节点放到等待队列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

addConditionWaiter 方法可以看出,只是创建一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还可以得出其他结论:

  1. 条件队列内部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;
  2. 条件队列是单向队列。

作为对比,这里把条件队列和同步队列做出对比:

condition-node-7yUQjE

AQS 同步队列如下:

condition-aqs-n5Fs85

再来看下 Condition 的条件队列

condition-condition-A97bUS

waitStatus 在 AQS 中已经进行了介绍:

  1. 默认状态为 0;
  2. waitStatus > 0 (CANCELLED 1) 说明该节点超时或者中断了,需要从队列中移除;
  3. waitStatus = -1 SIGNAL 当前线程的前一个节点的状态为 SIGNAL,则当前线程需要阻塞(unpark);
  4. waitStatus = -2 CONDITION -2 :该节点目前在条件队列;
  5. waitStatus = -3 PROPAGATE -3 :releaseShared 应该被传播到其他节点,在共享锁模式下使用。
  • fullyRelease 方法 (AQS)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前节点的 state
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

fullyRelease 方法是由 AQS 提供的,首先获取当前的 state,然后调用 release 方法进行释放锁。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

release 方法在 AQS 中做了详细的介绍。它的主要作用就是释放锁,并且需要注意的是:

  1. fullyRelease 会一次性释放所有的锁,所以说不管重入多少次,在这里都会全部释放的。
  2. 这里会抛出异常,主要是在释放锁失败时,这时就会在 finally 里面将节点状态置为 Node.CANCELLED。
  • isOnSyncQueue(node)

通过上面的流程,节点已经放到了条件队列并且释放了持有的,而后就会挂起阻塞,直到 signal 唤醒。但是在挂起时要保证节点已经不在同步队列(SyncQueue)中了才可以挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean isOnSyncQueue(Node node) {
// 当前节点是条件队列节点,或者上一个节点是空
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;

return findNodeFromTail(node);
}
// 从尾部开始遍历
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

如果一个节点(总是一个最初放置在条件队列中的节点)现在正等待在同步队列上重新获取,则返回true。

这段代码的主要作用判断节点是不是在同步队列中,如果不在同步队列中,后面才会调用 park 进行阻塞当前线程。这里就会有一个疑问:AQS 的同步队列和 Condition 的条件队列应该是无关的,这里为什么会要保证节点不在同步队列之后才可以进行阻塞?因为 signal 或者 signalAll 唤醒节点之后,节点就会被放到同步队列中。

线程到这里已经被阻塞了,当有其他线程调用 signal 或者 signalAll 时,会唤醒当前线程。

而后会验证是否因中断唤醒当前线程,这里假设没有发生中断。那 while 循环的 isOnSyncQueue(Node node) 必然会返回 true ,表示当前节点已经在同步队列中了。

后续会调用 acquireQueued(node, savedState) 进行获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
// 是否拿到资源
boolean failed = true;
try {
// 中断状态
boolean interrupted = false;
// 无限循环
for (;;) {
// 当前节点之前的节点
final Node p = node.predecessor();
// 前一个节点是头节点, 说明当前节点是 头节点的 next 即真实的第一个数据节点 (因为 head 是虚拟节点)
// 然后再尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取成功之后 将头指针指向当前节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// p 不是头节点, 或者 头节点未能获取到资源 (非公平情况下被别的节点抢占)
// 判断 node 是否要被阻塞,获取不到锁就会一直阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这里就是 AQS 的逻辑了,同样可以阅读 AQS 的相关介绍。

  1. 不断获取本节点的上一个节点是否为 head,因为 head 是虚拟节点,如果当前节点的上一个节点是 head 节点,则当前节点为 第一个数据节点>
  2. 第一个数据节点不断的去获取资源,获取成功,则将 head 指向当前节点;
  3. 当前节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非公平锁)。这时候需要判断前一个节点状态决定当前节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。

值得注意的是,当节点放到 AQS 的同步队列时,也是进行争抢资源,同时设置 savedState 的值,这个值则是代表当初释放锁的时候释放了多少重入次数。

总体流程画图如下:

condition-await-q4EBQx

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final void signal() {
// 是否为当前持有线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
// firstWaiter 头节点指向条件队列头的下一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将原来的头节点和同步队列断开
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {

// 判断节点是否已经在之前被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// 调用 enq 添加到 同步队列的尾部
Node p = enq(node);
int ws = p.waitStatus;
// node 的上一个节点 修改为 SIGNAL 这样后续就可以唤醒自己了
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

enq 同样可以阅读 AQS 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空 需要初始化头节点,此时头尾节点是一个
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不为空 循环赋值
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

通过 enq 方法将节点放到 AQS 的同步队列之后,要将 node 的前一个节点的 waitStatus 设置为 Node.SIGNAL。signalAll 的代码也是类似。

总结

Q&A

Q: Condition 和 AQS 有什么关系?

A: Condition 是基于 AQS 实现的,Condition 的实现类 ConditionObject 是 AQS 的一个内部类,在里面共用了一部分 AQS 的逻辑。

Q: Condition 的实现原理是什么?

A: Condition 内部维护一个条件队列,在获取锁的情况下,线程调用 await,线程会被放置在条件队列中并被阻塞。直到调用 signal、signalAll 唤醒线程,此后线程唤醒,会放入到 AQS 的同步队列,参与争抢锁资源。

Q: Condition 的等待队列和 AQS 的同步队列有什么区别和联系?
A: Condition 的等待队列是单向链表,AQS 的是双向链表。二者之间并没有什么明确的联系。仅仅在节点从阻塞状态被唤醒后,会从等待队列挪到同步队列中。

结束语

本文主要是阅读 Condition 的相关代码,不过省略了线程中断等逻辑。有兴趣的小伙伴。可以更深入的研究相关的源码。