为什么有了 ReentrantLock 还需要 ReentrantReadWriteLock?
ReentrantReadWriteLock 底层实现原理?
ReentrantReadWriteLock 底层读写状态如何设计的?
读锁和写锁的最大数量是多少?
本地线程计数器 ThreadLocalHoldCounter 是用来做什么的?
缓存计数器 HoldCounter 是用来做什么的?
写锁的获取与释放是怎么实现的?
读锁的获取与释放是怎么实现的?
什么是锁的升降级?RentrantReadWriteLock 为什么不支持锁升级?
读写锁的使用 通过一个简单的示例快速了解读写锁的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private static class Cache <K, V> { private final Map<K, V> map = new HashMap <>(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock (); private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock(); private V get (K key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } private void put (K key, V value) { writeLock.lock(); try { map.put(key, value); } finally { writeLock.unlock(); } } } public static void main (String[] args) { Cache<Integer, String> cache = new Cache <>(); new Thread (() -> { cache.put(1 , "one" ); System.out.println("t1 put 1-one" ); }).start(); new Thread (() -> { String val = cache.get(1 ); System.out.println("t2 get 1-" + val); }).start(); }
源码分析 与 ReentrantLock 类似的是,ReentrantReadWriteLock 中也有 Sync
、FairSync
和 NonfairSync
这个三个内部类,除此之外,多了 ReadLock
和 WriteLock
这两个内部类
整体结构 ReentrantReadWriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L ; private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock () { this (false ); } public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); } ...... }
初始化一个 ReentrantReadWriteLock 会默认带着初始化出 ReadLock 与 WriteLock
Sync
1 2 3 abstract static class Sync extends AbstractQueuedSynchronizer { ...... }
Sync 依旧是继承了 AQS 的一个核心内部类
FairSync
1 2 3 4 5 6 7 8 9 static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L ; final boolean writerShouldBlock () { return hasQueuedPredecessors(); } final boolean readerShouldBlock () { return hasQueuedPredecessors(); } }
FairSync 十分简单,继承了 Sync 并提供两个方法,毕竟 ReentrantReadWriteLock 的关键不在此
NonfairSync
1 2 3 4 5 6 7 8 9 static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L ; final boolean writerShouldBlock () { return false ; } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } }
同 FairSync 类似
ReadLock
1 2 3 4 5 6 7 public static class ReadLock implements Lock , java.io.Serializable { private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } ...... }
ReadLock 的内部还有一个 Sync 属性的变量,并且这个变量是通过构造方法中的 ReentrantReadWriteLock 获取的
WriteLock
1 2 3 4 5 6 7 public static class WriteLock implements Lock , java.io.Serializable { private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } ...... }
WriteLock 与 ReadLock 类似
读写对比 对比 ReadLock 与 WriteLock 获取锁的差别
ReadLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public static class ReadLock implements Lock , java.io.Serializable { private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquireShared(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean tryLock () { return sync.tryReadLock(); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.releaseShared(1 ); } public Condition newCondition () { throw new UnsupportedOperationException (); } }
WriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static class WriteLock implements Lock , java.io.Serializable { private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquire(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public boolean tryLock ( ) { return sync.tryWriteLock(); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.release(1 ); } public Condition newCondition () { return sync.newCondition(); } public boolean isHeldByCurrentThread () { return sync.isHeldExclusively(); } public int getHoldCount () { return sync.getWriteHoldCount(); } }
分别对比 ReadLock 与 WriteLock 的 ① ② ③ 处
可以发现其本质是,ReadLock 使用了 AQS 的共享模式,WriteLock 使用了 AQS 独占模式
详细分析 Sync 还是先从 AQS 的子类 Sync 开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L ; static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null ; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter (); setState(getState()); } abstract boolean readerShouldBlock () ; abstract boolean writerShouldBlock () ; protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); } final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } } final boolean tryWriteLock () { Thread current = Thread.currentThread(); int c = getState(); if (c != 0 ) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); } if (!compareAndSetState(c, c + 1 )) return false ; setExclusiveOwnerThread(current); return true ; } final boolean tryReadLock () { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false ; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return true ; } } } ...... }
WriteLock 然后是另一个内部类 WriteLock
WriteLock 是面向用户的一个内部类,所以里面的方法基本都是引用的 Sync 中的方法,并没有很复杂的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public static class WriteLock implements Lock , java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L ; private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquire(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public boolean tryLock ( ) { return sync.tryWriteLock(); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.release(1 ); } public Condition newCondition () { return sync.newCondition(); } public String toString () { Thread o = sync.getOwner(); return super .toString() + ((o == null ) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]" ); } public boolean isHeldByCurrentThread () { return sync.isHeldExclusively(); } public int getHoldCount () { return sync.getWriteHoldCount(); } }
ReadLock 和上面的 WriteLock 类似,核心的逻辑都在 Sync 中,这里只是面向用户提供方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static class ReadLock implements Lock , java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L ; private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquireShared(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean tryLock () { return sync.tryReadLock(); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.releaseShared(1 ); } public Condition newCondition () { throw new UnsupportedOperationException (); } public String toString () { int r = sync.getReadLockCount(); return super .toString() + "[Read locks = " + r + "]" ; } }