CAS 详解
2024-11-21 09:25:53 # Technical # JavaConcurrency

CAS(Compare-And-Swap)比较和交换,其作用是让 CPU 先比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,JVM 只是封装了汇编的调用

CAS 的操作是原子的,所以多线程使用 CAS 更新数据时可以不用锁,JDK 中大量使用了 CAS 来更新数据而避免加锁(Atomic…)

底层原理

通过一个简单的 AtomicInteger 的示例说明

1
2
3
4
5
6
7
static AtomicInteger ai = new AtomicInteger();

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(() -> System.out.println(ai.incrementAndGet())).start();
}
}

AtomicInteger 中的 incrementAndGet 方法

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

Unsafe 中的 getAndAddInt 方法

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

所以核心为 Unsafe 中的 3 个 native 方法

1
2
3
4
5
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

unsafe.cpp

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

可以看到它通过 Atomic::cmpxchg 来实现比较和替换操作。其中参数 x 是即将更新的值,参数 e 是原内存的值

cmpxchg 是一种低级原子操作通常作为硬件指令实现(在 x86 体系结构中,它是 CMPXCHG 指令)。该操作用于以原子方式将内存地址(addr)处的值与预期值(e)进行比较,如果匹配,则使用新值(x)更新内存位置

Atomic::cmpxchg 的实现与平台相关

Linux x86 下的实现:

1
2
3
4
5
6
7
8
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

Windows x86 下的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判断是否是多处理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:

如果是多处理器,为 cmpxchg 指令添加 lock 前缀。反之,就省略 lock 前缀(单核处理器不需要 lock 前缀提供的内存屏障效果)。这里的 lock 前缀就是使用了处理器的总线锁(最新的处理器都使用缓存锁代替总线锁来提高性能)

cmpxchg(void* ptr, int old, int new),如果 ptr 和 old 的值一样,则把 new 写到 ptr 内存,否则返回 ptr 的值,整个操作是原子的。在 Intel 平台下,会用 lock cmpxchg 来实现,使用 lock 触发缓存锁,这样另一个线程想访问 ptr 的内存,就会被 block 住

cmpxchg 指令是一条原子指令。在 CPU 执行 cmpxchg 指令时,处理器会自动锁定总线,防止其他 CPU 访问共享变量,然后执行比较和交换操作,最后释放总线

cmpxchg 指令在执行期间,CPU 会自动禁止中断。这样可以确保 CAS 操作的原子性,避免中断或其他干扰对操作的影响

cmpxchg 指令是硬件实现的,可以保证其原子性和正确性。CPU 中的硬件电路确保了 cmpxchg 指令的正确执行,以及对共享变量的访问是原子的

ABA 问题

CAS 的原理是通过检查值是否发生变化来判断是否存在其他线程的篡改,如果一个值有 A 变为 B,再右 B 变为 A,那么仅通过检查值来判断的方式就失效了

ABA 的危害

ABA 问题引发严重后果的通常发生在 无锁算法 中,例如并发堆栈、队列和链表等,这些数据结构依赖 CAS 来实现安全的无锁更新

通过一个简陋的用 CAS 实现无锁堆栈的例子说明下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Node head;
head = B;
A.next = head;
head = A;

Thread thread1 = new Thread(()->{
oldValue = head;
sleep(3秒);
compareAndSet(oldValue, B);
}
);

Thread thread2 = new Thread(() -> {
// 弹出A
newHead = head.next;
head.next = null; //即A.next = null;
head = newHead;
// 弹出B
newHead = head.next;
head.next = null; // 即B.next = null;
head = newHead; // 此时head为null

// 压入C
head = C;
// 压入D
D.next = head;
head = D;
// 压入A
A.next = D;
head = A;
}
);
thread1.start();
thread2.start();

在某个时刻,线程1 试图将栈顶换成 B,但它获取栈顶的 oldValue(为A)后,被 线程2 中断了。线程2 依次将A、B 弹出,然后压入 C、D、A。然后换 线程1 继续运行,线程1 执行 compareAndSet 发现 head 指向的元素确实与 oldValue 一致,都是 A,所以就将 head 指向 B 了。但是,注意第 19 行代码,线程2 在弹出 B 的时候,将 B 的 next 置为 null 了,因此在 线程1 将 head 指向 B 后,栈中只剩了一个孤零零的元素 B。但按预期来说,栈中应该放的是 B → A → D → C

GC 对 ABA 的影响

ConcurrentLinkedQueue 是一个基于 CAS 实现无锁、线程安全的队列,内部使用 CAS 来管理队列的头和尾指针。在 ConcurrentLinkedQueue 中大神道格·李注释到:

1
2
3
4
5
6
Note that like most non-blocking algorithms in this package,
this implementation relies on the fact that in garbage
collected systems, there is no possibility of ABA problems due
to recycled nodes, so there is no need to use "counted
pointers" or related techniques seen in versions used in
non-GC'ed settings.

ConcurrentLinkedQueue 的实现很大程度上依赖于 Java 的垃圾收集(GC)系统来防止 ABA 问题

在没有垃圾收集的环境中(例如 C 或 C++),当删除节点时(例如调用free()delete ),可以 回收 该内存位置以供将来分配。因此,相同的内存地址可以重复用于不同的节点或对象,这可能会导致 ABA 问题

而在像 Java 这样的垃圾收集环境中,当从队列中删除节点时,它的内存不会立即重用。 GC 确保被丢弃节点的内存在足够长的时间内保持不可用状态

在非垃圾收集环境中,程序员通常需要使用 计数指针版本标记 来跟踪节点已被重用的次数。计数指针不仅携带内存地址,还携带 版本号计数器,每次重用内存时都会递增。这确保了即使内存位置(例如,节点)被回收,指向它的指针也可以区分同一内存块的不同实例

由于 Java 的垃圾收集器确保内存不会过早回收,因此 无需使用计数指针 或版本控制来跟踪内存重用。 Java 开发者不需要手动管理内存,因此逻辑上从队列中移除的节点最终都会被 GC 收集,确保不会出现 由于内存复用而导致的 ABA 问题

解决

对于有 GC 的语言来说,确保不重用同一内存地址的对象即可,即 push 时即刻 new node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class LockFreeStack<T> {

private final AtomicReference<Node<T>> head = new AtomicReference<>();

private final AtomicInteger size = new AtomicInteger(0);

public void push(T value) {
// 每 push 一个元素都会 new 一个 Node 对象,即使 value 相同,也不会出现结构紊乱
Node<T> newNode = new Node<>(value);
Node<T> oldHead;
do {
oldHead = head.get();
newNode.next = oldHead;
} while (!head.compareAndSet(oldHead, newNode));
size.incrementAndGet();
}

public T pop() {
Node<T> oldHead;
Node<T> newHead;
do {
oldHead = head.get();
if (oldHead == null) return null;
newHead = oldHead.next;
} while (!head.compareAndSet(oldHead, newHead));
size.decrementAndGet();
return oldHead.value;
}

public int size() {
return size.get();
}

private static class Node<T> {
final T value;
Node<T> next;

Node(T value) {
this.value = value;
}

@Override
public String toString() {
return value == null ? "null" : value.toString();
}
}
}

较通用的方式是引入一个版本号,每更新一次就更新新的版本号,如此一来 A -> B -> A 就变为了 A1 -> B2 -> A3,这样就解决了 ABA 的问题。JDK 中的 AtomicStampedReference 提供了解决 ABA 问题的方法,它的 compareAndSet 方法首先会检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以 CAS 更新

下面是用 AtomicStampedReference 实现类似 AtomicInteger 功能的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private final AtomicStampedReference<Integer> asr;

public CASTest(int initValue) {
asr = new AtomicStampedReference<>(initValue, 0);
}

public void increment() {
boolean updated = false;
while (!updated) {
int[] stampHolder = new int[1];
Integer currValue = asr.get(stampHolder);
int currStamp = stampHolder[0];

updated = asr.compareAndSet(currValue, currValue + 1, currStamp, currStamp + 1);
}
}

public int getValue() {
return asr.getReference();
}

public int getStamp() {
int[] stampHolder = new int[1];
asr.get(stampHolder);
return stampHolder[0];
}

public static void main(String[] args) throws InterruptedException {
CASTest test = new CASTest(0);
for (int i = 0; i < 100; i++) {
new Thread(test::increment).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("Final counter value: " + test.getValue());
System.out.println("Final counter stamp: " + test.getStamp());
}

AtomicStampedReference.get 方法的参数是一个大小至少为 1 的数组,乍一看似乎很奇怪,这里如此设计出于一下原因:

  1. 多个返回值:AtomicStampedReference.get 会返回 value 与 stamp,value 直接返回,stamp 通过数组返回
  2. 原子性:如果将 value 和 stamp 分开返回可能会导致高并发场景下的不一致
  3. 传递类型:如果是 int 这样的基本数据类型,方法内部的更改不会反应到外部,通过数组可以对调用者可见

循环开销

如果 CAS 一直不成功,会给 CPU 带来很多无谓的开销。JVM 在某些情况会采用 退避策略(Backoff Strategies),在重试 CAS 之前引入一个小的随机延迟,通过允许其他线程完成其更新来减少争用,从而防止线程出现紧密无限期的自旋

指数退避(Exponential backoff):每次 CAS 尝试失败后,线程等待的时间间隔逐渐变长

随机退避(Randomized backoff):线程等待随机时间,这有助于更公平地分配访问并减少多个线程再次冲突的机会

还有就是像在 Synchronized 中,如果轻量级锁 CAS 几次失败后,就会采用重量级锁来处理