ConcurrentHashMap 详解
2024-11-21 09:25:53 # Technical # JavaConcurrency

为什么 HashTable 慢? 它的并发度是什么? 那么 ConcurrentHashMap 并发度是什么?

ConcurrentHashMap 在 Java7 和 Java8 中实现有什么差别?Java8 解決了 Java7 中什么问题?

ConcurrentHashMap JDK1.7 实现的原理是什么?

ConcurrentHashMap JDK1.8 实现的原理是什么?

ConcurrentHashMap JDK1.7 中 Segment 数 (concurrencyLevel) 默认值是多少?为何一旦初始化就不可再扩容?

ConcurrentHashMap JDK1.7 说说其 put 的机制?

ConcurrentHashMap JDK1.7 是如何扩容的?

ConcurrentHashMap JDK1.8 是如何扩容的?

ConcurrentHashMap JDK1.8 链表转红黑树的时机是什么? 临界值为什么是 8?

ConcurrentHashMap JDK1.8 是如何进行数据迁移的?

JDK 1.7

在 JDK 1.5 ~ 1.7 版本中,Java 使用 分段锁机制 实现 ConcurrentHashMap

ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个 Hash 表划分为多个分段。而每个 Segment 元素,即每个分段则类似于一个 Hashtable。这样,在执行 put 操作时首先根据 hash 算法定位到元素属于哪个 Segment,然后对该 Segment 加锁即可。因此,ConcurrentHashMap 在多线程并发编程中可是实现线程安全的 put 操作

数据结构

关键属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 默认初始容量,可通过构造函数指定
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
* 默认负载因子,可通过构造函数指定
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* 默认并发级别,可通过构造函数指定
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 最大容量,可通过构造函数指定 <= 1073741824
*/
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* 每个 Segment 的最小容量,必须是 2 的幂 >= 2
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
* 最大 Segment 数量
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

/**
* 用于 Size() 和 containsValue() 中,表示尝试无锁获取的最多次数
* 尝试无锁获取结果的次数大于该值,就会转为加锁操作
*/
static final int RETRIES_BEFORE_LOCK = 2;

/**
* Segment 数组
*/
final Segment<K,V>[] segments;

关键内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class Segment<K,V> extends ReentrantLock implements Serializable {

/**
* 获取锁之前的最大重试次数,通过有限次数的重试来维持缓存的一致性,同时避免过多的自旋等待
* 这个值的设置取决于系统的处理器数量:
* 如果系统是多处理器(CPU核心数大于1),则设置为64
* 如果是单处理器系统,则设置为1(单处理器系统上,直接尝试获取锁,不进行额外的重试)
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

/**
* 每个 Segment 中的「HashMap」
*/
transient volatile HashEntry<K,V>[] table;

/**
* 元素数量
*/
transient int count;

/**
* 修改计数
*/
transient int modCount;

/**
* 每个 Segment 中 HashEntry 的大小
* 大于该值会进行 rehashed
* capacity * loadFactor
*/
transient int threshold;

/**
* 每个 Segment 中的负载因子
*/
final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

// ......
}

HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

// 无锁更新 next
final void setNext(HashEntry<K,V> n) {
// 确保内存写入的顺序,保证可见性与有序性
UNSAFE.putOrderedObject(this, nextOffset, n);
}

// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
// next 字段的内存偏移量
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
// 通过反射获取next的内存偏移量,后续可直接使用这个偏移量来访问,避免每次调用时反射的开销
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全

java7-ConcurrentHashMap

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// 计算出 concurrencyLevel 最大的 2 的 n 次方
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 默认情况下,sshift 为 4,segmentShift 为 28,segmentMask 为 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;

// initialCapacity 是整个 map 初始的大小
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 根据 initialCapaticty 计算出每个 Segment 的容量
int c = initialCapacity / ssize;
// 如果没分完,补上
if (c * ssize < initialCapacity)
++c;
// segment 最小容量,默认为 2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;

// 创建一个 Segment 作为 Segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 根据并发度计算出来的最小 2 的 n 次方 ssize 初始化一个 segment 数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 往数组里写入 s0
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}

初始化过程主要是为了计算出 Segment 数组的大小

其中需要注意的是,初始化只会创建一个 Segment s0 放入 Segment 数组中

Segment 默认的容量为 2,负载因子 0.75,这样插入第一个元素不会扩容,插入第二个才会进行扩容

segmentShiftsegmentMask 这两个属性仅赋值,还未知其作用

PUT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 key 的 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28)位,剩下高 4 位
// 然后和 segmentMask(15)做一次与操作,也就是说 j 是 hash 值的高 4 位,就是 Segment 数组的下标
int j = (hash >>> segmentShift) & segmentMask;
// 判断待插入的 segment 是不是还未初始化,如果未初始化则通过 ensureSegment 初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 将 key-value 插入 segment s 中
return s.put(key, hash, value, false);
}

s = (Segment<K,V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE) 这里是直接通过计算 segment 数组中元素的内存偏移量来获取元素

具体看下初始化指定位置的 segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// 计算 segments 在 k 处的内存地址偏移量
long u = (k << SSHIFT) + SBASE;
// 声明一个 segment 类型的变量,用于存储获取到的对象
Segment<K,V> seg;
// 通过内存地址偏移量来获取对象,判断是不是 null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 第一个在前面构造函数中已经初始化了,这里拿来做原型
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 这里再判断一遍,防止其他的线程初始化了
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 初始化 segment 对象
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 这里循环获取 seg,直到 seg 不为 null
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 通过 CAS 尝试赋值
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

接下来就到了 Segment 的内部方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 调用 ReentrantLock 的 tryLock 尝试获取该 segment 的独自锁
// 尝试加锁失败会通过 scanAndLockForPut 获取锁
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// segment 内部的「hashMap」
HashEntry<K,V>[] tab = table;
// 利用 hash 值,求得合适的数组下标
int index = (tab.length - 1) & hash;
// 找到数组在 index 处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);

// 从链表头往下循环
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { // 判断 key 是否冲突
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break; // 冲突了就直接可以结束循环
}
e = e.next; // 继续往下遍历
}
// 走到尾部
else {
if (node != null)
// 节点以及创建过了,直接插入链表头
node.setNext(first);
else
// 创建节点,然后插入链表头
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 扩容
else
// 将新节点放到链表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}

put 过程中涉及两个关键方法:scanAndLockForPutrehash

scanAndLockForPut 是在 tryLock 尝试获取独占锁失败后执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 通过 segment 和 hash 获取指定的链表对象
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // 记录重试次数

// 使用 tryLock 非阻塞获取锁
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) { // 小于 0,说明还未找到目标节点
if (e == null) { // 遍历到尾节点
if (node == null)
// 进到这里说明数组该位置的链表上没有相关节点,这里创建一个节点
// 另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
// 有在链表中找到目标节点的位置
retries = 0;
else
// 顺着链表接着往下走
e = e.next;
}
// 判断重试次数是否超过 MAX_SCAN_RETRIES(单核1多核64)
// 如果没超过重试次数,接着用 tryLock 非阻塞式获取锁
// 如果超过重试次数,那么不抢了,进入到阻塞队列等待锁
else if (++retries > MAX_SCAN_RETRIES) {
// lock() 是阻塞方法,直到获取锁后才返回
lock();
break;
}
// 如果重试次数为偶数
else if ((retries & 1) == 0 &&
// 如果通过 entryForHash 获取的链表头与当前的 first 不同,说明在此期间有新元素进入到了链表,链表发生了变化
(f = entryForHash(this, hash)) != first) {
// 重新走一遍这个 scanAndLockForPut 方法
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

简单理解下来就如方法名的意思一样,为 put 遍历相关节点以及关键的获取独占锁

另一个关键方法就是 rehash 扩容方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 原先的容量上 * 2
int newCapacity = oldCapacity << 1;
// * 负载因子
threshold = (int)(newCapacity * loadFactor);
// 创建新的数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// newCapacity 为 2 的 n 次方,这里 -1 得到「00...11111」
int sizeMask = newCapacity - 1;
// 遍历原数组,将原数组 i 处的链表拆分到新数组的 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算链表在新数组中的位置
int idx = e.hash & sizeMask;
if (next == null)
// 链表只有一个元素的话就直接放入
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 遍历链表,找出 rehash 后仍然在同一位置上的连续一段链表
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 先放入 lastRun 及其后续节点
newTable[lastIdx] = lastRun;
// 遍历 lastRun 之前的节点
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
// 计算新数组中的位置
int k = h & sizeMask;
// 获取指定位置上的链表
HashEntry<K,V> n = newTable[k];
// 插入链表
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 最后将新节点放入 newTable 中完成扩容
// 先计算 node 在新数组中的位置
int nodeIndex = node.hash & sizeMask;
// 然后将 node 插入指定位置上的链表头
node.setNext(newTable[nodeIndex]);
// 然后将新的链表放回新数组
newTable[nodeIndex] = node;
// 最后将新数组赋值给 table
table = newTable;
}

扩容过程中的整体逻辑还是好理解的,不过中的两次循环还是耐人寻味的

如果抛开第一次循环,仅靠第二次循环,整体的逻辑也是能走通的

这里第一次循环用来确定链表中的一段连续部分,这些节点在扩容后依然会落到新表的同一个索引位置上。这部分链表可以直接搬迁,不需要对每个节点重新构造,这样做可以减少对象的创建和复制,提升性能

虽然有可能 lastRun 会是最后一个元素,这样第一次循环会显得多余,不过大神 Doug Lea 也说到,根据统计,如果使用默认的阈值,大概只有 1/6 的节点需要克隆

这里需要稍微注意下,扩容的大小始终是 2 的幂次(oldCapacity << 1)这样可以通过位操作(&)来计算索引,比使用取模操作(%)更高效

而 threshold 是扩容阈值,用来决定何时出发扩容,它等于 capacity * loadFactor,确保当数组填充到一定程度时开始扩容,避免哈希冲突导致性能下降

GET

相较于 put 来说,get 就很简单了

  • 计算 hash 值,找到对应的 segment 数组中的具体位置
  • 再找出 segment 中 HashEntry 数组的具体位置
  • 然后就是顺着链表查找即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 计算 hash 值
int h = hash(key);
// 计算内存地址偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 找对应 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 找对应链表
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

REMOVE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public V remove(Object key) {
// 计算 hash
int hash = hash(key);
// 通过 hash 找 segment 数组对应位置上的 segment(依然通过内存地址偏移量)
Segment<K,V> s = segmentForHash(hash);
// 没找到对应 segment 就直接返回 null,找到进行 remove
return s == null ? null : s.remove(key, hash, null);
}

// 又是 Segment 的内部方法
final V remove(Object key, int hash, Object value) {
// 先重试获取下锁
if (!tryLock())
// 这个方法和 scanAndLock 有点眼熟 下面单独分析
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
// 找数组位置
int index = (tab.length - 1) & hash;
// 拿到指定位置上的链表
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null; // 声明目标移除节点的前节点,用于连接目标节点的下一个节点
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
// 判断是否是目标 key
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
// 前节点为空,说明移除的是头节点
setEntryAt(tab, index, next);
else
// 将前节点与 next 节点连接起来
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
// 不是目标,存下作为前节点
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}

与 put 类似,remove 在一开始尝试非阻塞获取锁失败后,进入到一个获取锁的方法 scanAndLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void scanAndLock(Object key, int hash) {
// 还是先找到链表头节点
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1; // 记录重试次数
// 循环重试
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0; // 找到了目标节点,修改重试次数
else
e = e.next;
}
// 判断重试次数,重试过久直接阻塞式获取锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
// 重试过程中链表发生变更,重来
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}

这里即使没有用到找出的目标节点(或者没找出),通过延迟锁重试,直到有更多信息表明节点可能需要操作,减少不必要的锁争用

并发问题分析

添加节点的操作 put 和删除节点的操作 remove 都是要加 segment 上的独占锁的,所以它们之间自然不会有问题,我们需要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操作

  • put 操作的线程安全性
    • 初始化 segment,通过 CAS 来初始化 Segment
    • get 遍历链表的过程中发生了 put,put 采用头插法,所以并不影响 get 操作
    • put 对 get 的可见性:不管是 put 修改 HashEntry 还是扩容,通过 volatile 和 UNSAFE 方法保证了可见性
  • remove 操作的线程安全性
    • 如果 remove 的是头节点,那么需要将头节点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组
    • 如果 remove 的不是头节点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的

JDK 1.8

在 JDK1.8 之前,ConcurrentHashMap 是通过分段锁机制来实现的,所以其最大并发度受 Segment 的个数限制。因此,在 JDK1.8 中,ConcurrentHashMap 的实现原理摒弃了这种设计,而是选择了与 HashMap 类似的数组 + 链表 + 红黑树的方式实现,而加锁则采用 CAS 和 synchronized 实现

数据结构

关键属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* 最大容量,可通过构造函数指定 <= 1073741824
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* 默认初始容量,可通过构造函数指定,2 的幂,至少为 1
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* 最大可能的数组大小,非 2 的幂
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 默认并发级别,这里并没用,只是单纯的保留
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 默认负载因子,通常使用「n - (n >>> 2)」表达式计算
*/
private static final float LOAD_FACTOR = 0.75f;

/**
* 链表转红黑树的阈值,必须大于 2,默认 8
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* 红黑树转链表的阈值,要小于上面的 TREEIFY_THRESHOLD,
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* 链表转红黑树的最大数组长度,配合 TREEIFY_THRESHOLD 使用
* 只有数组长度小于该值,并且链表长度大于 TREEIFY_THRESHOLD 才会转红黑树
*/
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* 红黑树转链表的最小容量,也是配合 UNTREEIFY_THRESHOLD 使用
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* 正数:CHM 的当前容量
* 负数:表示 CHM 正在进行扩容,负数有两部分:高位存储扩容状态,地位存储参与扩容的线程数量
*/
private transient volatile int sizeCtl;

/**
* 决定了上面 sizeCtl 中存储扩容状态的位数
* 默认 16,意味 sizeCtl 可以存储 2^16 = 65535 个不同的扩容状态
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* 参与扩容操作的线程数量上限
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* sizeCtl 中存储扩容状态的偏移量
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

关键内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 链表节点
static class Node<K,V> implements Map.Entry<K,V> {
......
}

// 扩容过程中的标记节点
// 当一个线程处理完一个桶的转移后,会将这个 ForwardingNode 放在旧表的对应位置
// 它告诉其他线程该桶已经被处理过,数据已经被转移到新表中
// hash 值固定为 MOVED (-1)
// 包含对新表的引用
static final class ForwardingNode<K,V> extends Node<K,V> {
......
}

// 在 computeIfAbsent 和 compute 方法中作为一个占位符使用
// 表示有一个线程正在处理该桶,阻止其他线程同时修改
// hash 值固定为 RESERVED (-3)
// 支持并发的 compute 操作
// 防止多个线程同时计算同一个 key 的值
static final class ReservationNode<K,V> extends Node<K,V> {
......
}

// 作为红黑树结构的根节点
// 管理红黑树的并发访问和修改
// 包含对实际的 TreeNode 根节点的引用
// 实现了复杂的锁机制,支持读写分离
// 允许在树形结构上进行高效的并发操作
// 维护树的平衡和结构,而不暴露具体的实现细节
static final class TreeBin<K,V> extends Node<K,V> {
......
}

// 表示红黑树中的一个节点
static final class TreeNode<K,V> extends Node<K,V> {
......
}

java8-ConcurrentHashMap

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 无参构造,啥都没有
* 默认容量 16
*/
public ConcurrentHashMap() {
}

/**
* 指定初始容量
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 确保容量不超过最大容量,计算初始容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

/**
* 创建一个相同的 map
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

/**
* 指定初始容量和负载因子
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

/**
* 指定初始容量、负载因子还有并发级别
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 确保容量是要大于「并发度」的
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

在计算初始容量时,有一个很经典的方法 tableSizeFor

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 返回最接近 c 的最大 2 的幂
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

这里是参考的 Hacker’s Delight 中的算法,类似的还有通过位移计算最小的 2 的幂

1
2
3
4
5
6
7
8
static int ceilingPowerOf2(int x) {
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
return x - (x >> 1);
}

PUT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算 hash
int hash = spread(key.hashCode());
// 记录链表长度
int binCount = 0;

for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 第一次 put,初始化数组
tab = initTable();
// 根据 hash 找数组上对应的位置(这里与 jdk7 一致)判断位置上是不是空的
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 位置是空的,用 CAS 放入新元素
// 如果 CAS 失败,继续循环重试
// 如果 CAS 成功就可以直接结束循环
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 这里 MOVED 是一个静态常量 -1
// 如果 hash 等于 -1 说明在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,后面再详看
tab = helpTransfer(tab, f);
else {
// 到这里说明数组在这个位置上已经有元素了
V oldVal = null;
// 获取数组该位置的头节点的监视器锁
synchronized (f) {
// 再校验下,看头有没有变
if (tabAt(tab, i) == f) {
// 如果头节点的 hash 值是大于等 0 的,说明是链表
if (fh >= 0) {
// 记录链表的长度
binCount = 1;

for (Node<K,V> e = f;; ++binCount) {
K ek;
// 判断是否是重复的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 是否需要覆盖
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 判断是不是到了链表尾
if ((e = e.next) == null) {
// 插入链表尾部(不同于 jdk7 的头插法)
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 这里判断是否需要将链表转为红黑树
if (binCount != 0) {
// 判断链表长度是否大于 TREEIFY_THRESHOLD
if (binCount >= TREEIFY_THRESHOLD)
// 进入 teeifyBin 也不一定会转为红黑树,可能仅仅扩容
// 还需要判断数组长度是不是小于 MIN_TREEIFY_CAPACITY(64)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

接下来看数组为空时的初始化方法 initTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 看到 while 就可以猜想到这里大概率是用 CAS 来实现的
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 这是个十分关键的属性
// 这里 sizeCtl 小于 0,说明有其他线程正在进行初始化
if ((sc = sizeCtl) < 0)
// 礼貌性地让一下
Thread.yield(); // lost initialization race; just spin
// 还未初始化,这里通过 CAS 尝试将 sizeCtl 赋值 -1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS 成功...
try {
// 这里还是要继续判断下,万一恰好其他线程初始化完,才进来
if ((tab = table) == null || tab.length == 0) {
// 这里 sc > 0 可能为 true 吗?
// 默认数组大小 DEFAULT_CAPACITY 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 其实就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl,标志初始化完成
sizeCtl = sc;
}
break;
}
}
return tab;
}

链表转红黑树 treeifyBin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 为 64
// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 扩容
tryPresize(n << 1);
// 找到头节点 b
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 获取头节点的监视器锁
synchronized (b) {
// 再校验下,确认 b 是头节点
if (tabAt(tab, index) == b) {
// 接下来就是遍历链表,转为红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

扩容

接下来就是核心方法 扩容 tryPresize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private final void tryPresize(int size) {
// 这里调用前面提到的算法,计算 size 的 1.5倍加 1 的最大 2 的幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 通过 sizeCtl 来判断当前 CHM 是否处于扩容状态
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这里看起来是不是眼熟,和前面初始化数组的逻辑一样
// 因为 tryPresize 方法会在 putAll 中调用,考虑数组可能未初始化的情况
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 如果新的容量小于 sizeCtl,或者已经是最大容量了,就退出循环
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 确保在这个过程中数组没有被其他线程更新
else if (tab == table) {
// 这里生成一个与当前数组大小相关的标记
// 这个标记用于识别特定的扩容操作,确保所有线程协作用于同一次扩容
int rs = resizeStamp(n);
// sizeCtl 小于 0,说明有其他的线程正在进行扩容
if (sc < 0) {
Node<K,V>[] nt;
// 检查扩容标记是否匹配
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
// 检查是否达到最大扩容线程数
sc == rs + 1 || sc == rs + MAX_RESIZERS ||
// 新的数组已经被创建
(nt = nextTable) == null ||
// 是否还有工作要做
transferIndex <= 0)
// 满足上述任一条件就直接退出
break;
// 不满足上述所有条件,通过 CAS 尝试将 sizeCtl + 1(表示加入一个扩容线程)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// CAS 成功则进行数据迁移
transfer(tab, nt);
}
// 到这里说明现在还没其他线程开始扩容
// 通过 CAS 尝试将 szieCtl 设置为一个很大的负数
// 这个「很大的负数」编码了扩容标记和当前参与扩容的线程数(初始为 1)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// CAS 成功,开始数据迁移
transfer(tab, null);
}
}
}

终于到了真正的 Boss,数据迁移方法 transfer

此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。

阅读源码之前,先要理解并发操作的机制。原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。

第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,这个读者应该能理解吧。其实就是将一个大的迁移任务分为了一个个任务包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 可以理解为「步长」,单核时等于数组长度 n,多核时为 (n >>> 3) / NCPU
// NCPU 是处理器的核数
// MIN_TRANSFER_STRIDE 默认最小步长 16
// 这里判断 stride 是否小于默认步长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range

// 如果带迁移的数组为 null,需要先初始化
// 调用 transfer 的外围方法会确保第一个发起迁移的线程的 nextTab 为 null
// 之后参与迁移的线程调用时 nextTab 不会为 null
if (nextTab == null) { // initiating
try {
// 新数组容量 * 2
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;

// fwd:正在被迁移的 Node
// ForwardingNode 继承了 Node,可以理解成一个 key、val、next 都为 null,hash 为 MOVE 的 Node
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 标记某个位置的迁移是否完成
boolean advance = true;
// 整个迁移过程是否结束
boolean finishing = false; // to ensure sweep before committing nextTab

// 开始循环迁移....
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 循环为当前线程分配一段数组索引范围来处理
while (advance) {
int nextIndex, nextBound;
// i 代表当前正在处理的索引
// bound 代表当前批次的下界
// 如果 --i 仍然大于等于 bound 或者整个扩容过程即将结束,就停止当前的任务分配
if (--i >= bound || finishing)
advance = false;
// transferIndex 记录着全局下个可用的索引
// 如果 transferIndex 小于等于 0,说明桶都被分配了,没有更多的工作要做了
else if ((nextIndex = transferIndex) <= 0) {
// 设置结束标志
i = -1;
advance = false;
}
// 核心的任务分配逻辑
// 通过 CAS 尝试更新 transferIndex
// 新的 TransferIndex = nextIndex > stride ? nextIndex - stride : 0
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 如果 CAS 成功
// 更新 bound
bound = nextBound;
// 设置新的处理索引
i = nextIndex - 1;
// 设置 advance 来结束循环
advance = false;
}
}
// i 是当前处理索引,n 是原数组的大小,next 是扩容后的数组大小
// i < 0,防止出现负数索引(可能由于某些异常情况或错误导致)
// i >= 0,当前处理索引大于原数组大小,说明已经处理完成了所有的旧数组
// i + n >= nextn,在 CHM 扩容过程中,元素从旧数组移动到新数组时,可能移动到两种位置,一种是原位置,一种是 i + n 的位置,这里判断是为了确定是否已处理到了新表的末尾
// 如果满足任一条件说明可能已经完成了自己的任务
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 如果 finishing 为 true 说明扩容结束
nextTable = null;
table = nextTab;
// 更新 sizeCtl 为新容量的 0.75 倍(用于下次扩容的阈值)
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 通过 CAS 尝试将 sizeCtl - 1,表示一个扩容线程完成了工作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 检查是否为最后一个完成的线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 说明有其他线程还在迁移,此线程就先结束掉了
return;
// 到这里说明这是此次迁移过程中,最后一个完成工作的线程
// 设置迁移结束状态
finishing = advance = true;
// 准备进行最后一个检查
i = n; // recheck before commit
}
}
// 获取 i 处理的节点 f
// 判断 i 处有没有元素
else if ((f = tabAt(tab, i)) == null)
// 没有元素就直接将前面的空节点 fwd 放进去
advance = casTabAt(tab, i, null, fwd);
// 判断 i 处节点的 hash 是不是 MOVED
else if ((fh = f.hash) == MOVED)
// hash 为 MOVED 说明已经迁移过了
advance = true; // already processed
else {
// 获取 f 的对象监视器锁
synchronized (f) {
// 判断下,节点有没有被别的线程更新过
if (tabAt(tab, i) == f) {
// 声明两个变量:低位节点,高位节点
Node<K,V> ln, hn;
// 和前面 putVal 方法一样,通过节点的 hash 判断是链表还是红黑树
if (fh >= 0) {
// n 为原数组的大小(2 的幂)
// 这里 runBit 要么为 0 要么为 n
// 为 0,节点将保持在索引 i
// 为 n,节点将移动到索引 i + n
int runBit = fh & n;
// 看到 lastRun 这个变量,是否勾起往昔回忆?
Node<K,V> lastRun = f;
// 和 jdk7 的 CHM 一样,还是先遍历一遍,找出一段连续的节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 根据前面计算的 runBit,决定将最后这段连续的节点放到低位(ln)还是高位(hn)
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 对于不在最后连续段的节点,根据它们的 hash & n,判断在低位还是高位
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位节点(ln)放入新数组的 i 处
setTabAt(nextTab, i, ln);
// 高位节点(hn)放入新数组 i + n 处
setTabAt(nextTab, i + n, hn);
// 在原数组的 i 处放入标志迁移的节点 fwd,这样其他线程看到该位置的 hash 为 MOVED 就不会进行迁移了
setTabAt(tab, i, fwd);
// 设置迁移状态
advance = true;
}
// 到这里说明 f 的结构是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 区分高低位节点后,如果节点数量小于等于 UNTREEIFY_THRESHOLD(默认 6),则将红黑树转回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

还有一个类似的方法 helpTransfer 前面在 put 中见过

当 put 时,发现正在 transfer 的话,会调用 helpTransfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 这里需要传入原数组 tab 和待插入(修改)位置上的节点 f
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 判断下 f 还是不是 fwd 节点,然后从 fwd 中拿到其他线程创建的新数组并判断下
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据数组大小计算出一个扩容标识,确保与其他线程进行的是同一个扩容操作
int rs = resizeStamp(tab.length);
// 循环 CAS 重试
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 这里的逻辑和 tryPresize 中的一样,主要是用来判断还是否需要迁移
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 通过 CAS 尝试设置新的 sc,并开始迁移
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 这里不同于前面第一次迁移,这里新的数组已经存在
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
// 到这里说明其他的线程已经扩容完成了,这里就直接返回全局的数组
return table;
}

到此,整个 put 方法算是解读完了,其中的细节还需多多揣摩

GET

相较于 put 方法,get 方法要简单许多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 hash
int h = spread(key.hashCode());
// 判断数组是否为空,hash 对应数组上的位置是不是为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头节点是否就是要找的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果头节点 hash 小于 0,说明正在扩容,或者该位置节点结构是红黑树
else if (eh < 0)
// 这个 find 方法是 Node 的,但是 ForwardingNode 和 TreeBin 会进行重写
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

REMOVE

有了前面 put 方法的基础,remove 方法可以快速过下就了解了

1
2
3
public V remove(Object key) {
return replaceNode(key, null, null);
}

这里 remove 的实现是通过 replaceNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组为空,或者对应数组位置为空,就直接结束了
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
// 和 put 类似的,如果发现正在迁移就协助迁移
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
// 上锁
synchronized (f) {
// 进来还是先判断下,节点有没有更新过
if (tabAt(tab, i) == f) {
// 判断节点的结构,链表还是红黑树
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
// 找目标节点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
// 进行替换或删除
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null) // 替换
e.val = value;
else if (pred != null) // 删除中间节点
pred.next = e.next;
else
// 删除头节点
setTabAt(tab, i, e.next);
}
break;
}
// 不匹配,往下遍历
pred = e;
if ((e = e.next) == null)
break;
}
}
// 红黑树
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
// 判断是否已执行过替换或删除
if (validated) {
// 如果是删除操作就修改下计数
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

总结

通过回答开头的问题,来对 ConcurrentHashMap 做一个总结

  • ConcurrentHashMap 在 Java7 和 Java8 中实现有什么差别?Java8 解決了 Java7 中什么问题?

    Java7 采用 分段锁 机制,整个 Map 被分为 16(默认) 个 Segment,每个 Segment 都是一个小的 HashMap。每个 Segment 都有自己的锁,默认配置下,最理想的情况允许最多 16 个线程同时写入

    Java8 移除了 Segment 分段锁,采用数组 + 链表/红黑树,当链表长度超过 8 时,转为红黑树,当红黑树节点少于 6 时,转为链表。通过 CAS 和 synchronized 来保证并发安全,锁的颗粒度更细,锁定单个桶,支持并发扩容,多线程可以同时协助扩容

    相较于 Java7,Java8 提高了并发度,降低了锁的颗粒度,提高了检索效率,提高了扩容效率

  • ConcurrentHashMap 在 Java7 中 Segment 数 (concurrencyLevel) 默认值是多少? 为何一旦初始化就不可再扩容?

    默认并发度为 16;之所以初始化后不再进行对 Segment 数组的扩容出于以下几点考虑:

    1. 锁的复杂性:每个 Segment 本身就是一个独立的 HashMap,扩容需要重新分配 Segment 数组,为了防止在扩容过程中发现线程安全问题,就需要引入更复杂的全局锁机制
    2. 影响性能:接上述的,引入了复杂的全局锁必定就会带来性能的损耗,这与 ConcurrentHashMap 的设计初衷所违背

Thanks