ReentrantLock 详解
2024-11-21 09:25:53 # Technical # JavaConcurrency

什么是可重入?什么是可重入锁?它用来解决什么问题?

ReentrantLock 的核心是 AQS,那么它怎么来实现的,继承吗?说说其类内部结构关系

ReentrantLock 是如何实现公平锁的?

ReentrantLock 是如何实现非公平锁的?

ReentrantLock 默认实现的是公平还是非公平锁?

使用 ReentrantLock 实现公平和非公平锁的示例?

ReentrantLock 和Synchronized 的对比?

ReentrantLock 的特点

显式锁定与解锁

ReentrantLock 需要显式地调用 lock() 方法进行锁定,而在不再需要锁时,需要调用 unlock() 方法解锁。这与 synchronized 关键字的隐式锁定机制不同,后者由JVM自动管理锁定和解锁的过程

1
2
3
4
5
6
7
8
9
10
11
ReentrantLock lock = new ReentrantLock();

// 锁定
lock.lock();
try {
// 受保护的临界区
// 执行需要同步的代码
} finally {
// 确保最终解锁,避免锁泄露
lock.unlock();
}

可重入性

ReentrantLock 是一个 可重入锁,这意味着如果一个线程已经持有了该锁,它可以多次获取锁而不会造成死锁。例如,如果某个线程调用了 lock() 多次,那么它必须调用相同次数的 unlock() 才能真正释放锁

公平锁与非公平锁

ReentrantLock 可以配置为 公平锁非公平锁。公平锁保证了线程获取锁的顺序是按照请求锁的顺序来进行的(即 FIFO 队列),而非公平锁则是默认的方式,可能会打破请求顺序,导致某些线程可能会「插队」

1
2
ReentrantLock lock = new ReentrantLock(true); // 公平锁
ReentrantLock lock = new ReentrantLock(false); // 非公平锁(默认)

尝试加锁

synchronized 不同,ReentrantLock 提供了 tryLock() 方法,允许线程在尝试获取锁时不阻塞。如果锁没有被其他线程持有,则 tryLock() 会立即返回 true,否则返回 false。这个特性可以用于避免长时间的等待

1
2
3
4
5
6
7
8
9
if (lock.tryLock()) {
try {
// 执行需要同步的代码
} finally {
lock.unlock();
}
} else {
// 锁未能获得,执行其他操作
}

还有一个带超时时间的版本 tryLock(long time, TimeUnit unit),它允许线程在指定的时间内等待锁

条件变量

ReentrantLock 提供了 newCondition() 方法来创建一个或多个 Condition 对象,用于线程间通信。Condition 类似于 synchronized 中的 wait()notify()/notifyAll(),但其功能更为灵活,可以创建多个条件变量以精细控制线程的等待和唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// 线程A
lock.lock();
try {
condition.await(); // 当前线程进入等待状态
} finally {
lock.unlock();
}

// 线程B
lock.lock();
try {
condition.signal(); // 唤醒等待的线程
} finally {
lock.unlock();
}

中断响应

ReentrantLock 支持中断。线程在等待获取锁时可以被中断,这在 synchronized 中是不支持的。可以通过 lockInterruptibly() 方法来实现中断响应

1
2
3
4
5
6
7
8
try {
lock.lockInterruptibly(); // 支持中断的锁定操作
// 执行需要同步的代码
} catch (InterruptedException e) {
// 响应中断
} finally {
lock.unlock();
}

ReentrantLock 的核心原理

ReentrantLock 的核心原理基于底层的 CAS(Compare-And-Swap)机制AQS(AbstractQueuedSynchronizer)框架,这是它实现线程安全、重入性、锁竞争管理等特性的基础

锁的获取

ReentrantLock 的锁获取操作是通过 AQS 提供的 acquire() 方法来实现的。在锁的获取过程中,可能涉及到以下几种情况:

  • 初次获取锁

    • 如果锁未被任何线程占用(state == 0),线程通过 CAS 操作将 state 设置为 1,表示成功获取锁
  • 锁的重入

    • 如果锁已经被当前线程占用,线程可以再次调用 lock(),这时 state 的值会递增,记录锁的重入次数
    • 当线程多次重入锁时,每调用一次 unlock()state 值会递减一次,直到 state == 0 时锁才真正被释放
  • 锁竞争

    • 如果锁已经被其他线程持有,当前线程将会被挂起,进入 AQS 维护的 FIFO 队列,等待被唤醒
    • 等待队列中的线程将按照先后顺序依次被唤醒,尝试获取锁

公平锁与非公平锁

公平锁:在公平锁中,锁的获取顺序按照线程请求锁的顺序来进行,避免「插队」。公平锁通过检查等待队列是否有其他线程,如果有,当前线程必须进入队列等待。

非公平锁:非公平锁允许线程「插队」,即当线程请求锁时,它会直接尝试用 CAS 获取锁,而不是检查等待队列中的线程。非公平锁可能会导致某些线程饥饿(长时间无法获取锁),但通常性能更好,因为减少了对队列的检查

锁的释放

锁的释放通过 AQS 的 release() 方法来实现:

  1. 当前线程调用 unlock() 时,会将 state 值减 1
  2. 如果 state 值变为 0,表示锁已完全释放,AQS 会将等待队列中的第一个线程唤醒,使其尝试获取锁
  3. 唤醒的线程再次尝试通过 CAS 更新 state,如果成功,则获取锁

条件变量

ReentrantLock 支持条件变量(Condition),这也是由 AQS 提供的。条件变量通过 await()signal() 等方法实现线程间通信。

  • **await()**:当线程调用 condition.await(),它会释放当前锁并进入条件等待队列,同时阻塞线程
  • **signal()**:当其他线程调用 condition.signal(),会从条件等待队列中选择一个线程唤醒,使其重新尝试获取锁

中断响应

ReentrantLock 提供了对线程中断的支持,特别是在使用 lockInterruptibly() 时,当线程在获取锁的过程中被中断,AQS 会将其从等待队列中移除,并抛出 InterruptedException,让线程响应中断

ReentrantLock 简单示例

可重入性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final ReentrantLock lock = new ReentrantLock();

public void methodA() {
lock.lock();
try {
System.out.println("methodA is executing...");
methodB();
} finally {
lock.unlock();
}
}

public void methodB() {
lock.lock();
try {
System.out.println("methodB is executing...");
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ReentrantLockTest test = new ReentrantLockTest();
test.methodA();
}

Out:

1
2
methodA is executing...
methodB is executing...

公平锁与非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private static class Worker implements Runnable {

private final ReentrantLock lock;

public Worker(ReentrantLock lock) {
this.lock = lock;
}

@Override
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ": Worker is running");
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
if (!Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
ReentrantLock fairLock = new ReentrantLock(true);
ReentrantLock nonFairLock = new ReentrantLock(false);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(new Worker(fairLock), "T-" + i));
}
for (int i = 0; i < 10; i++) {
Thread currT = threads.get(i);
if (i == 0) {
currT.start();
} else {
// 确保阻塞队列的顺序
Thread preT = threads.get(i - 1);
Thread.State preTState;
while (true) {
preTState = preT.getState();
if (preTState == Thread.State.WAITING || preTState == Thread.State.TIMED_WAITING) {
currT.start();
break;
}
}
}
System.out.println("T-" + i + " start...");
}
}

Out(公平锁):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
T-0 start...
T-0: Worker is running
T-1 start...
T-2 start...
T-3 start...
T-4 start...
T-5 start...
T-6 start...
T-7 start...
T-8 start...
T-9 start...
T-1: Worker is running
T-2: Worker is running
T-3: Worker is running
T-4: Worker is running
T-5: Worker is running
T-6: Worker is running
T-7: Worker is running
T-8: Worker is running
T-9: Worker is running

非公平锁的情景我运行了许多次,始终没能出现相应的打印….

条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 5;

void produce(int value) throws InterruptedException {
lock.lock();
try {
// 队列满了时,阻塞等待
while (queue.size() == capacity) {
System.out.println("Queue is full, producer is waiting...");
// 阻塞等待 notFull 条件
notFull.await();
}
queue.offer(value);
System.out.println("Producer >>> " + value);
// 通知消费者,队列不为空,继续消费
notEmpty.signal();
} finally {
lock.unlock();
}
}

void consume() throws InterruptedException {
lock.lock();
try {
// 队列为空时,阻塞等待
while (queue.isEmpty()) {
System.out.println("Queue is empty, consumer is waiting...");
// 等待 notEmpty 条件
notEmpty.await();
}
Integer value = queue.poll();
System.out.println("Consumer <<< " + value);
// 通知生产者,队列没满,继续生产
notFull.signal();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ReentrantLockTest3 test3 = new ReentrantLockTest3();

// 创建生产者
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
test3.produce(i);
// 模拟生产时间
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

// 创建消费者
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
test3.consume();
// 模拟消费时间
TimeUnit.MILLISECONDS.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

producerThread.start();
consumerThread.start();
}
  1. 在使用 Condition 时,必须先持有对应的锁,这个和 Object 类的 wait()、notify()、notifyAll() 方法类似,必须先持有某个对象的监视器锁,才能执行
  2. ArrayBlockingQueue 采用了上面例子的方式实现了生产者-消费者,实际生产中可以直接使用 ArrayBlockQueue

Out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Producer >>> 0
Consumer <<< 0
Producer >>> 1
Producer >>> 2
Consumer <<< 1
Producer >>> 3
Producer >>> 4
Consumer <<< 2
Producer >>> 5
Producer >>> 6
Producer >>> 7
Consumer <<< 3
Producer >>> 8
Queue is full, producer is waiting...
Consumer <<< 4
Producer >>> 9
Consumer <<< 5
Consumer <<< 6
Consumer <<< 7
Consumer <<< 8
Consumer <<< 9

中断响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final ReentrantLock lock = new ReentrantLock();

private void performTask() {
try {
lock.lockInterruptibly();
try {
System.out.println(Thread.currentThread().getName() + " acquired lock");
// 模拟工作时间
TimeUnit.SECONDS.sleep(5);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + " interrupted");
}
}

public static void main(String[] args) throws InterruptedException {
ReentrantLockTest4 test4 = new ReentrantLockTest4();
Thread t1 = new Thread(test4::performTask);
Thread t2 = new Thread(test4::performTask);
t1.start();
t2.start();

TimeUnit.SECONDS.sleep(2);
if (t1.getState() == Thread.State.WAITING) {
t1.interrupt();
} else {
t2.interrupt();
}
}

Out:

1
2
Thread-0 acquired lock
Thread-1 interrupted

ReentrantLock 源码解析

ReentrantLock 实现了 Lock 接口,虽然 ReentrantLock 继承了很多 AQS 的方法,但是它自身并没有直接继承 AQS,而是通过它的内部类继承 AQS 的

关键属性

ReentrantLock 只有两个属性

1
2
3
4
5
6
public class ReentrantLock implements Lock, java.io.Serializable {
// 序列号
private static final long serialVersionUID = 7373984872572414699L;
// 同步队列
private final Sync sync;
}

sync 是 ReentrantLock 中的关键属性

ReentrantLock 的大部分操作其本质是对 sync 属性的操作

而 sync 的对象是 ReentrantLock 的一个内部类

关键内部类

抽象内部类 Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

// 获取锁,抽象方法,留给子类去实现
abstract void lock();

// 非公平的方式获取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 这里用的是 AQS 的方法,获取节点状态
int c = getState();
// 判断节点状态是否正常,未进行任何等待或取消
if (c == 0) {
// 通过 CAS 尝试修改节点状态
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前线程是否拥有锁
else if (current == getExclusiveOwnerThread()) {
// 增加重入次数
int nextc = c + acquires;
if (nextc < 0) // 次数小于 0 则抛出异常
throw new Error("Maximum lock count exceeded");
// 设置节点状态,这里同样调用的是 AQS 的方法
setState(nextc);
return true;
}
return false;
}

// 尝试释放锁
protected final boolean tryRelease(int releases) {
// 与加锁相反,这里减少重入次数
int c = getState() - releases;
// 如果当前线程没有持有独占锁,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标志:是否释放完全
boolean free = false;
// 判断状态是否为 0,是否完全释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新节点状态
setState(c);
return free;
}

// 判断当前线程是否持有锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

公平锁内部类 FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 继承 Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

// 实现了 Sync 的抽象方法
final void lock() {
// 本质也还是 AQS
acquire(1);
}

// 公平获取锁,实现了 AQS 的抽象方法
// 上面 lock 会调用到 AQS 的acquire 然后调用到这里
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里与非公平相比,在 CAS 设置状态之前多了一个判断
// 依然是 AQS 的方法,用来判断当前线程前面是否还有其他已排队的线程
// true:当前线程前面有其他已排队的线程,false:当前线程位于头部或队列为空
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

非公平内部类 NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 与公平锁相比,这里会先进行 CAS 尝试修改节点状态,如果成功就直接上锁了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

// 这里还是实现的 AQS 抽象方法
protected final boolean tryAcquire(int acquires) {
// 直接调用的 Sync 中的非公平上锁方法
return nonfairTryAcquire(acquires);
}
}

关键方法

构造方法

1
2
3
4
5
6
7
8
9
// 无参构造,sync 设置为非公平内部类
public ReentrantLock() {
sync = new NonfairSync();
}

// 含参构造,true:公平内部类,false:非公平内部类
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public void lock() {
sync.lock();
}

// tryLock 这里调用的是 Sync 类的非公平获取锁的方法
// 这里如果锁未被其他线程持有,则直接尝试获取,获取成功:true 失败:false
// 如果自己持有,状态 + 1,返回 true
// 如果被其他线程持有,直接返回 false
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

// 指定等待时间的 tryLock
// 这里调用的是 AQS 的方法
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// =======AbstractQueuedSynchronizer#tryAcquireNanos=======
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 这里会先调用子类的实现 FairSync#tryAcquie 或者 NonFiarSync#tryAcquie
return tryAcquire(arg) ||
// 尝试失败后,指定时间内自旋获取
doAcquireNanos(arg, nanosTimeout);
}

// =======AbstractQueuedSynchronizer#doAcquireNanos=======
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 为当前线程创建一个 Node,并加入到同步队列的尾部
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
// 下面的过程和 AQS 中 acquireQueued 方法过程基本一致,只是多了时间的判断
try {
for (;;) {
// 获取当前节点的前节点
final Node p = node.predecessor();
// 判断前节点是不是队列头,如果是就再尝试获取一下锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 前节点不是队列头或者再次尝试获取锁失败了
// 计算下剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 又到了这个关键方法了
// 判断下当前线程要不要暂停下,这里主要是判断前节点的状态
// 前节点 waitStatus true/false
// -1 true 前节点状态正常,暂停当前线程等待前节点的唤醒
// > 0 false 前节点取消了排队,当前节点指向前前节点,再进来判断
// 0/-2/-3 false 前节点状态未更新,设置前节点状态 -1,再进来判断
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 最后从队列中移除
if (failed)
cancelAcquire(node);
}
}

条件变量相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
// 注意:这个是内部类 Sync 的方法
final ConditionObject newCondition() {
return new ConditionObject();
}

// 而 Sync 中实例化的 ConditionObject 是 AQS 中的内部类
// ======AbstractQueuedSynchronizer.ConditionObject======
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;

// 条件队列的头节点
private transient Node firstWaiter;
// 条件队列的尾节点
private transient Node lastWaiter;

public ConditionObject() {}

// 可被中断的条件等待,不可中断的是另一个方法 awaitUninterruptibly()
// 正常情况下,会一直阻塞等待,直到 signal 方法的调用
public final void await() throws InterruptedException {
// 首先就是判断当前线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();

// 添加到条件队列中
Node node = addConditionWaiter();

// 这个 fullyRelease 方法是 AQS 的这里暂时先了解下作用,后面再分析
// 这里释放了节点的所有独占锁,并返回了锁的个数
int savedState = fullyRelease(node);
// 中断状态:唤醒前中断/唤醒后中断
int interruptMode = 0;

// 这里仍然是 AQS 的方法,用来判断节点是否在阻塞队列中
// 一直等到节点被移动到阻塞队列中
while (!isOnSyncQueue(node)) {
// 暂停当前线程
LockSupport.park(this);
// 唤醒之后,首先就是判断是否有中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 先将节点加入到阻塞队列,acquireQueue 返回的是是否被中断
// 如果线程被中断,并且是在唤醒前被中断的,就将中断状态设置为 REINTERRUPT,稍后重新中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 正常来说,到这里了的话,节点已经被移动到了阻塞队列中,node.nextWaiter 是为 null 的
// 不为 null 的情况是在唤醒前发生了中断
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 重新中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// 创建一个当前线程的条件节点,并加入到条件队列的尾部
private Node addConditionWaiter() {
// 获取条件队列的尾节点
Node t = lastWaiter;

// 如果尾节点不为 null,并且状态不再为 CONDITION(-2)就将所有已取消的节点清除出条件队列
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建条件节点,并将当前节点加入条件队列中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 清除队列中已取消的节点
private void unlinkCancelledWaiters() {
// 获取头节点
Node t = firstWaiter;
// 上一个正常等待的节点
Node trail = null;
// 从头节点开始往下遍历
while (t != null) {
// 获取下一个节点
Node next = t.nextWaiter;
// 判断当前节点的 waitStatus
if (t.waitStatus != Node.CONDITION) {
// 断开指向下个节点
t.nextWaiter = null;
if (trail == null)
// 在此之前还没出现「正常」的节点
// 说明之前的节点取消了等待,所以直接将下个节点设置为头节点
firstWaiter = next;
else
// 将上个正常的节点指向下个可能正常的节点
trail.nextWaiter = next;
if (next == null)
// 如果当前遍历到尾节点,将尾节点设置为上个正常节点
lastWaiter = trail;
}
else
// 记录正常的节点
trail = t;
t = next;
}
}

// 中断状态:唤醒后中断
private static final int REINTERRUPT = 1;
// 中断状态:唤醒前中断
private static final int THROW_IE = -1;

// 检查中断,如果没有中断返回 0,如果在 signal 之前中断返回 THROW_IE(-1)
// 如果在 signal 之后中断返回 REINTERRUPT(1)
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

// 唤醒等待了最久的线程
public final void signal() {
// 调用 signal 方法的线程必须要持有当前的独占锁
// 这个方法的实现由需要用到条件控制的子类来实现,如:ReentrantLock、ReentrantReadWriteLock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// FIFO,唤醒头节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

// 从头往后遍历,找出第一个需要唤醒的节点
// 有的线程可能会取消等待,但它可能依然在条件队列中
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
// 如果下一个节点为 null,将尾节点置为 null
lastWaiter = null;
// 断开指向下个节点
first.nextWaiter = null;
// transferForSignal 将节点从条件队列移动到同步队列,成功-true 失败-false
} while (!transferForSignal(first) &&
// 如果节点移动失败了,尝试移动下一个节点
(first = firstWaiter) != null);
}

// 唤醒所有的节点
public final void signalAll() {
// 依然先看看当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

// 类似 doSignal,从头节点往后遍历
// 只不过这里不像 doSignal 去判断节点的移动是否成功,这里每个节点都会去移动
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

// 不中断的条件等待
// 核心过程类似 await,不同的是,即使线程 interrupted,依然不会跳出循环
public final void awaitUninterruptibly() {
// 先将当前节点加入到条件队列中
Node node = addConditionWaiter();
// 释放节点所有的独占锁
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 唤醒之后判断是否被中断,设置中断状态,但并不跳出循环
if (Thread.interrupted())
interrupted = true;
}
// 如果需要被终止,再终止自己
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

// 处理等待后的中断
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 如果唤醒前中断,抛出异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 如果唤醒后中断,中断当前线程
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

// 等待指定纳秒
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

// 等待直到指定时间
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// 等待指定单位时间
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 计算要等待的时长(纳秒)
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// 下面核心步骤与 await 类似
// 新建节点并加入条件队列
Node node = addConditionWaiter();
// 释放所有锁
int savedState = fullyRelease(node);
// 计算过期时间
final long deadline = System.nanoTime() + nanosTimeout;
// 过期标识
boolean timedout = false;
// 中断标识
int interruptMode = 0;
// 循环判断是否移动到阻塞队列
while (!isOnSyncQueue(node)) {
// 时间到了
if (nanosTimeout <= 0L) {
// 转移节点到阻塞队列
// transferAfterCancelledWait 返回的值表示是否在唤醒前取消等待
// true-唤醒前取消 false-唤醒后取消(唤醒之后取消等待,意味着没有到等待时间点就被唤醒了,也就是没有超时)
timedout = transferAfterCancelledWait(node);
break;
}
// spinForTimeoutThreshold 是一个阈值 1000 纳秒(1 毫秒)
// 如果剩余等待的时间小于 1 毫秒就不 park 了,自旋检查
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 判断是否被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 更新剩余时间
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// 判断此条件是否属于指定 AQS
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}

// 判断是否还有等待条件的节点
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}

// 获取条件队列长度
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}

// 获取等待条件中的线程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

AQS 中提供相关条件控制的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// await 方法中,在进入条件队列后调用
// 释放节点所有持有的独占锁,并返回持有锁的数量
final int fullyRelease(Node node) {
// 标识是否失败
boolean failed = true;
try {
// 获取状态(锁的数量)
int savedState = getState();
// 释放所有的锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 如果失败了,将节点状态设置为 CANCELLED(1),这样这个节点就会被后面的节点给「清理」掉
node.waitStatus = Node.CANCELLED;
}
}

// 释放锁
public final boolean release(int arg) {
// 这里调用的 Sync 的实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// await 中判断是否被唤醒的关键方法
// 判断节点是否被移动到了阻塞队列中
final boolean isOnSyncQueue(Node node) {
// 首先判断节点的状态,如果节点状态为 CONDITION(-2)就说明还在条件队列中
// 然后就是判断前节点的指针是否为空,因为条件队列的单向链表,如果前节点的指针为 null,也说明不在阻塞队列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果节点有了后继节点,那么这个节点肯定是在阻塞队列里了
if (node.next != null) // If has successor, it must be on queue
return true;

// node.prev != null 是否说明节点在阻塞队列?
//

// 直接遍历阻塞队列来判断
return findNodeFromTail(node);
}

// 从头节点往后遍历阻塞队列,如果有找到指定节点,说明节点在阻塞队列
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

// 唤醒等待节点的关键方法
final boolean transferForSignal(Node node) {

// 首先 CAS 尝试将节点状态改成正常状态,失败直接返回
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// 将节点加入阻塞队列中
// 这里返回的 p 是 node 在阻塞队列的前节点
Node p = enq(node);
// 获取前节点的状态
int ws = p.waitStatus;
// 如果 ws > 0 说明前节点取消了等待,这时直接唤醒 node 对应的线程
// 如果 ws <= 0 说明之前没有对前节点的状态进行更新,这里通过 CAS 尝试将前节点状态设置为 -1
// 这里的逻辑类似 shouldParkAfterFailedAcquire
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前节点取消,或者 CAS 失败,唤醒当前节点的线程
LockSupport.unpark(node.thread);
return true;
}

// await 中 LockSupport 被唤醒后,如果线程处于中断状态,则调用此方法
// await(...) 等待指定时间方法中,时间到期也会调用该方法
// 用于将取消等待的节点从条件队列移动到阻塞队列
// 返回 true:在 signal 之前被中断(取消)
// 返回 false:在 signal 之后被中断(取消)
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 将节点的状态设置为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// CAS 成功,说明中断是在 signal 之前的,如果是在 signal 之后中断的话,signal 会将状态设置为 0
// 即使线程被中断,依然将节点移动到阻塞队列
enq(node);
return true;
}

// 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
// signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

核心流程

ReentrantLock 加锁与释放锁的过程就是上篇 AQS 中的过程

这里主要分析 ReentrantLock 的条件变量处理过程

ReentranLock-Condition-await

其实 await 中已经涵盖了 signal 了,signal 的关键在于 transferForSignal 方法,而 transferForSignal 的关键又在于其对 waitStatus 的判断以及线程的唤醒

ReentranLock-Condition-signal

等待与唤醒过程中同步队列与条件队列的变化

ReentrantLock-Queue

虚假唤醒

不管是 ReentrantLock 中的 await,还是 Synchronized 中的 wait,它们都会出现虚假唤醒的情况。虚假唤醒是并发编程中的一个普遍现象,不仅存在于 Java 中,在其他编程语言和并发系统中也可能发生。这是因为虚假唤醒通常源于底层系统的实现细节,而不是语言特性。

通常,需要使用 while 来检查条件是否满足,而不是 if

上面生产者-消费者的例子中,如果将 while 改为 if,可以观察到虚假唤醒的情况发生