ReentrantReadWriteLock 详解
2024-11-21 09:25:53 # Technical # JavaConcurrency

为什么有了 ReentrantLock 还需要 ReentrantReadWriteLock?

ReentrantReadWriteLock 底层实现原理?

ReentrantReadWriteLock 底层读写状态如何设计的?

读锁和写锁的最大数量是多少?

本地线程计数器 ThreadLocalHoldCounter 是用来做什么的?

缓存计数器 HoldCounter 是用来做什么的?

写锁的获取与释放是怎么实现的?

读锁的获取与释放是怎么实现的?

什么是锁的升降级?RentrantReadWriteLock 为什么不支持锁升级?

读写锁的使用

通过一个简单的示例快速了解读写锁的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static class Cache<K, V> {
private final Map<K, V> map = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

private V get(K key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}

private void put(K key, V value) {
writeLock.lock();
try {
map.put(key, value);
} finally {
writeLock.unlock();
}
}
}

public static void main(String[] args) {
Cache<Integer, String> cache = new Cache<>();
new Thread(() -> {
cache.put(1, "one");
System.out.println("t1 put 1-one");
}).start();
new Thread(() -> {
String val = cache.get(1);
System.out.println("t2 get 1-" + val);
}).start();
}

源码分析

与 ReentrantLock 类似的是,ReentrantReadWriteLock 中也有 SyncFairSyncNonfairSync 这个三个内部类,除此之外,多了 ReadLockWriteLock 这两个内部类

整体结构

ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
// 内部类 ReadLock 属性的变量
private final ReentrantReadWriteLock.ReadLock readerLock;
// 内部类 WriteLock 属性的变量
private final ReentrantReadWriteLock.WriteLock writerLock;
// 内部类 Sync 属性的变量
final Sync sync;

// 无参构造会请求下面的含参构造
public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
// 公平与非公平锁
sync = fair ? new FairSync() : new NonfairSync();
// 关键:这里默认初始化出来 ReadLock 与 WriteLock
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
......
}

初始化一个 ReentrantReadWriteLock 会默认带着初始化出 ReadLock 与 WriteLock

Sync

1
2
3
abstract static class Sync extends AbstractQueuedSynchronizer {
......
}

Sync 依旧是继承了 AQS 的一个核心内部类

FairSync

1
2
3
4
5
6
7
8
9
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

FairSync 十分简单,继承了 Sync 并提供两个方法,毕竟 ReentrantReadWriteLock 的关键不在此

NonfairSync

1
2
3
4
5
6
7
8
9
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

同 FairSync 类似

ReadLock

1
2
3
4
5
6
7
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
......
}

ReadLock 的内部还有一个 Sync 属性的变量,并且这个变量是通过构造方法中的 ReentrantReadWriteLock 获取的

WriteLock

1
2
3
4
5
6
7
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
......
}

WriteLock 与 ReadLock 类似

读写对比

对比 ReadLock 与 WriteLock 获取锁的差别

ReadLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static class ReadLock implements Lock, java.io.Serializable {

private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

public void lock() {
// ①
sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
// ②
sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void unlock() {
// ③
sync.releaseShared(1);
}

public Condition newCondition() {
throw new UnsupportedOperationException();
}
}

WriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static class WriteLock implements Lock, java.io.Serializable {

private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

public void lock() {
// ①
sync.acquire(1);
}

public void lockInterruptibly() throws InterruptedException {
// ②
sync.acquireInterruptibly(1);
}

public boolean tryLock( ) {
return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
// ③
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

分别对比 ReadLock 与 WriteLock 的 ① ② ③ 处

可以发现其本质是,ReadLock 使用了 AQS 的共享模式,WriteLock 使用了 AQS 独占模式

AQS 共享模式与独占模式

详细分析

Sync

还是先从 AQS 的子类 Sync 开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

// 将 state 一分为二,高 16 位用于共享模式,低16位用于独占模式
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 取 c 的高 16 位值,代表读锁的获取次数(包括重入)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 取 c 的低 16 位值,代表写锁的重入次数,因为写锁是独占模式
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 记录每个线程持有的读锁数量(读锁重入)
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

// ThreadLocal 的子类配合 HoldCounter 使用
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

// 组合上面两个类,记录当前线程的读锁数量
private transient ThreadLocalHoldCounter readHolds;

// 用于缓存,记录「最后一个获取读锁的线程」的读锁重入次数,
// 所以不管哪个线程获取到读锁后,就把这个值占为已用,这样就不用到 ThreadLocal 中查询 map 了
// 算不上理论的依据:通常读锁的获取很快就会伴随着释放,
// 显然,在 获取->释放 读锁这段时间,如果没有其他线程获取读锁的话,此缓存就能帮助提高性能
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
// 初始化 readHolds
readHolds = new ThreadLocalHoldCounter();
// 确保 readHolds 的内存可见性
setState(getState());
}

// 抽象方法,在获取读锁时用来判断当前线程是否需要被阻塞
abstract boolean readerShouldBlock();

// 抽象方法,在获取写锁时用来判断当前线程是否需要被阻塞
abstract boolean writerShouldBlock();


// 关键方法:释放写锁
protected final boolean tryRelease(int releases) {
// 判断是否当前线程持有
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算锁的次数
int nextc = getState() - releases;
// 如果次数为 0 了就该释放了
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

// 关键方法:获取写锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 计算写锁次数
int w = exclusiveCount(c);
// 判断是否有线程持有锁,不管的读锁还是写锁
if (c != 0) {
// 如果 w = 0 说明有线程持有读锁(可能是自己也可能是别人)
// 不管是谁,有读锁被持有,就不能获取写锁
// 如果 w != 0 说明有线程持有写锁,这时判断这个写锁是自己持有还是别人
// 如果别人持有了写锁,自己就不能再获取写锁了
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 到这里说明没有人持有读锁,并且自己持有着写锁,本次是重入写锁
// 所以这里判断下次数会不会溢出
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 更新状态
setState(c + acquires);
return true;
}
// 到这里说明是写锁读锁都为空的情况
// 先判断下需不需要被阻塞
if (writerShouldBlock() ||
// 再用 CAS 更新状态
!compareAndSetState(c, c + acquires))
return false;
// 设置当前线程持有写锁
setExclusiveOwnerThread(current);
return true;
}

// 关键方法:释放读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 判断当前线程是否是第一个获取读锁的线程
if (firstReader == current) {
// 判断读锁次数
if (firstReaderHoldCount == 1)
// 如果只有一次,就将 firstRead 设置为 null,当前线程不再持有读锁
firstReader = null;
else
// 如果不止一次获取读锁,锁的次数 -1
firstReaderHoldCount--;
} else {
// 先从缓存中获取
HoldCounter rh = cachedHoldCounter;
// 判断缓存中的线程是否是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 如果缓存的不是,就到 ThreadLocal 中获取
rh = readHolds.get();

int count = rh.count;
// 如果次数小于等于 1 就先从 ThreadLocal 中移除掉,代表解锁
if (count <= 1) {
readHolds.remove();
// 如果小于等于 0,那就说明 unlock 次数大于 lock 的次数,抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
// 更新次数
--rh.count;
}
// 循环,确保成功释放
for (;;) {
int c = getState();
// 计算释放一个读锁后的新的状态
int nextc = c - SHARED_UNIT;
// CAS 更新状态
if (compareAndSetState(c, nextc))
// 如果 nextc 为 0 说明读锁都释放了
// 可以提醒后面,可以唤醒因为获取写锁而被阻塞住的线程了
return nextc == 0;
}
}

// 关键方法:获取共享锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取状态(通过状态可以计算出读锁和写锁的次数)
int c = getState();

// 如果独占锁次数 != 0 说明有线程持有写锁
if (exclusiveCount(c) != 0 &&
// 判断是否是当前线程持有写锁
getExclusiveOwnerThread() != current)
// 如果其他线程持有写锁就直接返回失败(当前线程持有写锁是可以再次获取读锁的)
return -1;

// 计算读锁的次数
int r = sharedCount(c);

// 判断读锁是否要被阻塞
if (!readerShouldBlock() &&
// 判断是否会次数溢出(2^16-1)
r < MAX_COUNT &&
// 通过 CAS 将 state 的高 16 位 +1,低 16 位不变
// CAS 成功,代表获取读锁成功
compareAndSetState(c, c + SHARED_UNIT)) {
// 判断当前线程是否是第一个获取读锁的
if (r == 0) {
// 当前线程是第一个获取读锁的,所以为 firstReader 和 firstReaderHoldCount 赋值
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 说明当前线程是重复获取读锁的
firstReaderHoldCount++;
} else {
// cachedHoldCounter 是记录最后一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
// 如果缓存的最后一个获取读锁的线程不是当前线程,就更新为当前线程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 到这里,就是 cachedHoldCounter 记录的是当前线程,不过记录的次数为 0
//
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 到这里有三种情况:
// 1.readerShouldBlock 返回为 true
// 2.MAX_COUNT 溢出
// 3.CAS 更新状态失败
return fullTryAcquireShared(current);
}


final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取状态
int c = getState();
// 还是来判断下有没有其他线程持有写锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
// 其他线程持有写锁,返回 -1 让当前线程进入阻塞队列等待
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 说明上面遇到的是第一种情况
// 阻塞队列中有其他线程在等待
// 如果当前线程是 firstReader 说明可重入,直接到下面的 CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// cachedHoldCounter 缓存的不是当前线程
// 获取当前线程的 HoldCounter
rh = readHolds.get();
// count 为 0,说明就是在刚刚初始化出来的,可以直接 remove
if (rh.count == 0)
readHolds.remove();
}
}
// 如果 count 为 0,那这个线程就没获取过读锁,这种就到阻塞队列中排队
if (rh.count == 0)
return -1;
}
}
// 这是对应上面的第二种情况,直接抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 这对应着第三种情况(还有上面的 firstReader == current)
// 再次进行 CAS 尝试,如果失败了不要紧,这里是 for(;;),如果条件不变,还会回来的
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 还是判断下读锁次数,为 0 就设置 firstReader 为当前线程
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 重入直接次数 +1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 将 cachedHoldCounter 设置为当前线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

// tryWriteLock 与 tryAcquire 类似的是都是尝试获取读锁
// 区别在于 tryWriteLock 是 tryLock 的实现,是提供给用户的更激进的获取锁策略
// tryAcquire 是对 AQS 中的抽象方法的实现,是 lock 的核心实现
// tryWriteLock 每次增加 1 个锁计数,tryAcquire 增加指定个数的锁计数
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
// 获取当前锁的状态
int c = getState();
// 检查锁的状态是否已经被占用
if (c != 0) {
// 获取当前的写锁重入次数
int w = exclusiveCount(c);
// 如果写锁的重入次数为0(即没有写锁),或者当前线程不是持有写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
// ReentrantReadWriteLock 允许多读单写的访问控制,不允许写锁在有读锁时获取
// 无法获取写锁,返回false
return false;
// 如果写锁的重入次数已经达到了最大值,则抛出异常
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 更新写锁次数
if (!compareAndSetState(c, c + 1))
return false;
// 设置当前线程为独占锁的持有者
setExclusiveOwnerThread(current);
return true;
}

// 区别同上述的 tryWriteLock
final boolean tryReadLock() {
Thread current = Thread.currentThread();
// 无限循环,直到成功获取读锁或返回失败
for (;;) {
// 获取当前锁的状态
int c = getState();
// 如果存在了写锁就无法在获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 获取当前读锁的持有计数
int r = sharedCount(c);
// 判断锁的次数是否达到了最大值
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试通过 CAS 增加读锁的持有计数
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 判单当前线程是否是第一个获取读锁的线程
if (r == 0) {
// 如果是第一个获取读锁的线程,就设置当前线程为 firstReader
firstReader = current;
// 并设置第一个读锁持有者的持有次数为 1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 如果当前线程已经是第一个持有读锁的线程
// 增加第一个读锁持有者的持有次数
firstReaderHoldCount++;
} else { // 处理其他持有读锁的线程
// 获取当前线程的读锁持有计数
HoldCounter rh = cachedHoldCounter;
// 如果缓存的持有计数为空,或线程 ID 不匹配
if (rh == null || rh.tid != getThreadId(current))
// 从 ThreadLocal 的 readHolds 中获取当前线程的持有计数
cachedHoldCounter = rh = readHolds.get();
// 如果计数为 0,重置 ThreadLocal 中的持有计数
else if (rh.count == 0)
readHolds.set(rh);
// 增加当前线程的读锁持有计数
rh.count++;
}
return true;
}
}
}

......
}

WriteLock

然后是另一个内部类 WriteLock

WriteLock 是面向用户的一个内部类,所以里面的方法基本都是引用的 Sync 中的方法,并没有很复杂的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
// 用于处理核心逻辑的 Sync
private final Sync sync;

// protected 的构造函数
// 在构造 ReentrantReadWriteLock 的时候会默认调用这个构造函数构造 WirteLock
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

// 获取写锁
public void lock() {
sync.acquire(1);
}

// 可被中断地获取写锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// 「闯入式」获取写锁
public boolean tryLock( ) {
return sync.tryWriteLock();
}

// 带超时限制的获取写锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// 释放锁
public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}

public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

ReadLock

和上面的 WriteLock 类似,核心的逻辑都在 Sync 中,这里只是面向用户提供方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

// 获取锁
public void lock() {
sync.acquireShared(1);
}

// 可被中断地获取锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 「闯入式」获取锁
public boolean tryLock() {
return sync.tryReadLock();
}

// 带超时限制的获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 释放锁
public void unlock() {
sync.releaseShared(1);
}

public Condition newCondition() {
throw new UnsupportedOperationException();
}

public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}