Synchronized 详解
2025-01-19 09:46:49 # Technical # JavaConcurrency

Synchronized 关键字对于管理多个线程对资源的并发访问至关重要。它确保一次只有一个线程可以执行一段代码或方法,从而防止竞争条件并保持数据一致性。当方法或块同步时,它会获取与其关联的对象(或类)的锁。该机制通过强制互斥来提供线程安全。通过使用 synchronized,开发人员可以保护代码的关键部分,确保以受控方式访问共享资源,从而避免多线程应用程序中的潜在问题

使用

Synchronized 的使用方式整体上可分为两种,对象锁与类锁

对象锁

对象锁指 Synchronized 锁定的是实例对象

代码块的形式,可指定锁的对象,可以是自身 this,也可以是自定义的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
new Thread(new SyncObj(), "t1").start();
new Thread(new SyncObj(), "t2").start();
}

private static class SyncObj implements Runnable {
@Override
public void run() {
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Out:

1
2
t1 is running...
t2 is running...

对象锁为 this 时,要注意 this 的实例是否为同一个,否则无法起到锁的目的

方法的形式,Synchronized 修饰普通方法,此时锁的对象是 this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
new Thread(() -> new SyncObj().method(), "t1").start();
new Thread(() -> new SyncObj().method(), "t2").start();
new Thread(() -> SyncObj.getInstance().method(), "t3").start();
new Thread(() -> SyncObj.getInstance().method(), "t4").start();
}

private static class SyncObj {
private volatile static SyncObj instance = null;

private SyncObj() {}

public static SyncObj getInstance() {
if (instance == null) {
synchronized (SyncObj.class) {
if (instance == null) {
instance = new SyncObj();
}
}
}
return instance;
}

public synchronized void method() {
System.out.println(Thread.currentThread().getName() + " is running method");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Out:

1
2
3
4
t3 is running method
t1 is running method
t2 is running method
t4 is running method

使用方法锁的时候仍然要特别注意锁的对象,上面例子中只有 t4 被阻塞

类锁

类锁指的是 Synchronized 锁定的是 Class 对象

代码块的形式,可以指定 Class 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws InterruptedException {
new Thread(new SyncObj(), "t1").start();
new Thread(new SyncObj(), "t2").start();
}

private static class SyncObj implements Runnable {
@Override
public void run() {
synchronized (SyncObj.class) {
System.out.println(Thread.currentThread().getName() + " is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Out:

1
2
t1 is running...
t2 is running...

写这个例子的时候,我犯了一个很搞笑的错,困扰许久

new Thread(SyncObj::new, "t3").start();

这里我改了许多地方,测好多遍,就是没有正常的输出

最后才恍然大悟

1
2
3
new Thread(() -> {
new SyncObj();
}, "t3").start();

这样是不是就很清晰呢。。。

静态方法的形式,锁定的是当前类的 Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
new Thread(SynchronizedTest4::method1, "t1").start();
new Thread(SynchronizedTest4::method2, "t2").start();
}

private static synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " is running method1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static synchronized void method2() {
System.out.println(Thread.currentThread().getName() + " is running method2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Out:

1
2
t1 is running method1
t2 is running method2

原理分析

加锁和释放锁

因为 Synchronized 底层是 JVM 实现的,所以需要深入 JVM 看字节码

1
2
3
4
5
public static void main(String[] args) {
synchronized (SynchronizedTest5.class) {
System.out.println("running...");
}
}

javap -c 查看字节码文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(java.lang.String[]);
Code:
0: ldc #2 // 从运行时常量池中提取 SynchronizedTest5.class 放到操作数栈栈顶
2: dup // 复制操作数栈顶的值并将其放到操作数栈顶
3: astore_1 // 将栈顶的对象存储到局部变量表的第 1 个位置
4: monitorenter // 用于进入同步代码块。它会对之前存储在局部变量1中的对象(锁对象)加锁
5: getstatic #3 // 从类的静态字段中获取值 (System.out)
8: ldc #4 // 再次从常量池加载常量 (running...)
10: invokevirtual #5 // 调用实例方法 (System.out.println)
13: aload_1 // 从局部变量表中加载第 1 个位置的对象 (锁对象)
14: monitorexit // 退出同步代码块,释放锁对象
15: goto 23 // 无条件跳转到指定行号(此处为 23)。这个指令意味着正常执行路径结束,直接跳转到方法的结束处
18: astore_2 // 将栈顶的异常对象存储到局部变量 2。这段代码是为了处理在同步块中抛出的异常
19: aload_1 // 再次从局部变量表中加载第1个位置的对象(即锁对象),准备释放同步块中的锁
20: monitorexit // 确保在异常情况下也能够正确释放同步块的锁
21: aload_2 // 从局部变量表中加载刚刚捕获的异常对象
22: athrow // 将异常重新抛出,使得异常传播到调用栈的上层
23: return // 方法执行完毕,返回到调用方法的地方。对于 void 方法(如 main),这是一个空的返回
  • monitorenter:每个对象都是一个监视器(monitor),当 monitor 被占用时就会处于锁定状态,线程执行 monitorenter 指令时会尝试获取 monitor 的所有权,过程如下:
    1. 如果 monitor 的进入数为 0,则该线程进入 monitor,然后将进入数设置为 1,该线程成为 monitor 的所有者;
    2. 如果线程已经占有该 monitor,只是重新进入,则 monitor 的进入数 +1
    3. 如果其他线程已经进入了 monitor,则该线程进入阻塞状态,直到 monitor 的进入数为 0,再重新尝试成为 monitor 的所有者
  • monitorexit:执行 monitorexit 的线程必须是 objectref 所对应的 monitor 的所有者。指令执行时,monitor 的进入数 -1,如果 减1 后进入数为 0,那线程退出 monitor,不再是这个 monitor 的所有者。其他被这个 monitor 阻塞的线程可以尝试去获取这个 monitor 的所有权

monitorexit 指令出现了两次,第 1 次为同步正常退出释放锁;第 2 次为发生异步退出释放锁

由上可知:Synchronized 的底层是通过一个 monitor 的对象来完成,其实 wait/notify 等方法也依赖于 monitor 对象,这就是为什么只有在同步的块或者方法中才能调用 wait/notify 等方法,否则会抛出 java.lang.IllegalMonitorStateException 的异常的原因

可重入与可重入锁

可重入:若一个程序或子程序可以在任意时刻被中断然后被操作系统调度执行另外一段代码,这段代码又调用了该子程序而不会出错,则称其为可重入(Reentrant 或 re-entrant)的。 即当该子程序正在运行时,执行线程可以再次进入并执行它,仍然获得符合设计时预期的结果。与多线程并发执行的线程安全不同,可重入强调 对单个线程执行时重新进入同一个子程序仍然是安全的

可重入锁:又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者 class),不会因为之前已经获取过还没释放而阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
SynchronizedTest6 test6 = new SynchronizedTest6();
new Thread(test6::m1, "t1").start();
new Thread(test6::m1, "t2").start();
new Thread(test6::m1, "t3").start();
}

private synchronized void m1() {
System.out.println(Thread.currentThread().getName() + " is running m1...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
m2();
}

private synchronized void m2() {
System.out.println(Thread.currentThread().getName() + " is running m2...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
m3();
}

private synchronized void m3() {
System.out.println(Thread.currentThread().getName() + " is running m3...");
}

Out:

1
2
3
4
5
6
7
8
9
t1 is running m1...
t1 is running m2...
t1 is running m3...
t3 is running m1...
t3 is running m2...
t3 is running m3...
t2 is running m1...
t2 is running m2...
t2 is running m3...

如果将 test6 改为 new SynchronizedTest6() 会发生什么呢?

可以看出,不管是 t1,t2 还是 t3 执行的顺序都会是 m1 -> m2 -> m3

大致执行过程:

  • t1/t2/t3 monitorenter
  • 执行 m1 方法,monitor _count + 1 = 1
  • 执行 m2 方法,monitor _count + 1 = 2
  • 执行 m3 方法,monitor _count + 1 = 3
  • monitorexit
  • m3 执行完毕,monitor _count - 1 = 2
  • m2 执行完毕,monitor _count - 1 = 1
  • m1 执行完毕,monitor _count - 1 = 0

有序性与可见性

Java 内存模型(JMM)中,内存屏障(内存栅栏)被用来保证有序性和可见性。Synchronized (volatile)关键字通过隐式插入内存屏障来保证不同线程间的正确同步行为

synchronized 在进入同步块时,插入 LoadLoadLoadStore 屏障,在退出同步块时,插入 StoreStoreStoreLoad 屏障

JVM 针对 volatile 变量在写入时会插入 StoreStoreStoreLoad 屏障,读取时插入 LoadLoadLoadStore 屏障

这些屏障保证线程对共享变量的读写在进入和退出同步块时保持一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static int i = 0;

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
long start = System.currentTimeMillis();
synchronized (Synchronized7.class) {
while (i == 0) {
}
}
long end = System.currentTimeMillis();
System.out.println("t1 has ran " + (end - start) + "ms");
});
Thread t2 = new Thread(() -> {
synchronized (Synchronized7.class) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 set i = 1");
i = 1;
}
});
t2.start();
t1.start();
}

Out:

1
2
t2 set i = 1
t1 has ran 2001ms

LoadLoad 屏障

  • 特点:保证读最新值,防止 读-读重排序

  • 功能:保证从 主内存 中读取的值时最新的,确保在此屏障之前的所有 读操作(Load 指令)都在此屏障之前完成,即在这个屏障之后的读操作不能被重排到屏障之前

  • 例子

    1
    2
    3
    int a = sharedVariable1;  // 读取共享变量1
    LoadLoad; // LoadLoad屏障
    int b = sharedVariable2; // 读取共享变量2

    LoadLoad 屏障保证读取 sharedVariable2 之前,sharedVariable1 的读取已经完成,确保数据从主内存中加载

LoadStore 屏障

  • 特点:保证读取最新值,防止 读-写重排序

  • 功能:保证从 主内存 中读取的值时最新的,确保在此屏障之前的所有读操作(Load指令)都已完成,并且后续的写操作(Store指令)不能被重排到屏障之前

  • 例子

    1
    2
    3
    int a = sharedVariable;   // 读取共享变量
    LoadStore; // LoadStore屏障
    sharedVariable2 = a + 1; // 写操作,基于读取的值修改共享变量

    LoadStore 屏障确保读取 sharedVariable 之后,写入 sharedVariable2 时,sharedVariable 的值已经是最新的

StoreStore 屏障

  • 特点:保证写入 主内存,防止 写-写重排序

  • 功能:确保多个连续的写操作按预期顺序写入到 主内存,确保第一个写操作的结果已经对其他线程可见,然后再进行第二个写操作

  • 例子

    1
    2
    3
    sharedVariable1 = 10;    // 写入共享变量1
    StoreStore; // StoreStore屏障
    sharedVariable2 = 20; // 写入共享变量2

    StoreStore 屏障确保 sharedVariable1 的写操作已经完成并对其他线程可见,然后才能写入sharedVariable2

StoreLoad 屏障

  • 特点:最严格的屏障,通常开销较大,保证写入 主内存,防止 写-读重排序

  • 功能:确保一个线程的写操作完成后,能够立即在 主内存 中被其他线程看到,同时阻止在写操作完成之前读取任何共享变量

  • 例子

    1
    2
    3
    sharedVariable = 10;    // 写入共享变量
    StoreLoad; // StoreLoad屏障
    int a = sharedVariable2; // 读取共享变量2

    StoreLoad 屏障确保 sharedVariable 的写入已经完成并对其他线程可见,然后才能读取 sharedVariable2 的最新值。此屏障在高性能并发场景下尤其重要

对象头

JVM 中,对象在内存中的存储分为三个部分:对象头、实例数据和对齐填充

对象头

  • 对象头:Java 对象头一般占有 2 个机器码(在 32 位虚拟机中,1 个机器码等于 4 字节,也就是 32bit,在 64 位虚拟机中,1 个机器码是 8 个字节,也就是 64bit),但是 如果对象是数组类型,则需要 3 个机器码,因为 JVM 虚拟机可以通过 Java 对象的元数据信息确定 Java 对象的大小,但是无法从数组的元数据来确认数组的大小,所以用一块来记录数组长度
  • 实例数据:存放类的属性数据信息,包括父类的属性信息
  • 对齐填充:由于虚拟机要求 对象起始地址必须是 8 字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐

Synchronized 用的锁就是存在 Java 对象头里的,那么什么是 Java 对象头呢?Hotspot 虚拟机的对象头主要包括两部分数据:Mark Word(标记字段)、Class Pointer(类型指针)。其中 Class Pointer 是对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例,Mark Word 用于存储对象自身的运行时数据,它是 实现轻量级锁和偏向锁的关键。 Java对象头具体结构描述如下:

长度 内容 说明
32/64bit Mark Word 存储对象的 HashCode 或锁信息等
32/64bit Class Metadata Address 存储对象类型数据的指针
32/64bit Array Length 数组长度

Mark Word用于存储对象自身的运行时数据,如:哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等。下图是Java对象头 无锁状态下Mark Word部分的存储结构(32位虚拟机):

25Bit 4Bit 1Bit 2Bit
无所状态 hashCode 分代年龄 是否偏向锁 是否获取锁

对象头信息是与对象自身定义的数据无关的额外存储成本,但是考虑到虚拟机的空间效率,Mark Word 被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据,它会根据对象的状态复用自己的存储空间,也就是说,Mark Word 会随着程序的运行发生变化,可能变化为存储以下 4 种数据

锁状态 25 bit 4 bit 1 bit 2 bit
23 bit 2 bit 是否是偏向锁 锁标志位
轻量级锁 指向栈中锁记录的指针 00
重量级锁 指向互斥量(重量级锁)的指针 10
GC标记 11
偏向锁 线程ID Epoch 对象分代年龄 1 01

在 64 位下,Mark Word 为 64bit,其存储结构如下:

锁状态 25 bit 31 bit 1 bit 4 bit 1 bit 2 bit
cms_free 分代年龄 偏向锁 锁标志位
无锁 unused hashCode 0 01
偏向锁 ThreadID(54bit)
Epoch(2bit)
1 01

对象头的最后两位存储了锁的标志位,01 是初始状态,未加锁,其对象头里存储的是对象本身的哈希码,随着锁级别的不同,对象头里会存储不同的内容。偏向锁存储的是当前占用此对象的线程 ID而轻量级则存储指向线程栈中锁记录的指针。从这里我们可以看到,“锁”这个东西,可能是个锁记录+对象头里的引用指针(判断线程是否拥有锁时将线程的锁记录地址和对象头里的指针地址比较),也可能是对象头里的线程 ID(判断线程是否拥有锁时将线程的ID和对象头里存储的线程ID比较)

HotSport 中的 Mark Word

Lock Record

在线程进入同步代码块的时候,如果此同步对象没有被锁定,即它的锁标志位是 01,则虚拟机首先在当前线程的栈中创建一个「锁记录」(Lock Record)的空间,用于存储锁对象的 Mark Word 的拷贝,官方把这个拷贝称为 Displaced Mark Word。整个Mark Word 及其拷贝至关重要

Lock Record 是线程私有的数据结构,每一个线程都有一个可用 Lock Record 列表,同时还有一个全局的可用列表。每一个被锁住的对象 Mark Word 都会和一个 Lock Record 关联(对象头的 MarkWord 中的 Lock Word 指向 Lock Record 的起始地址),同时 Lock Record 中有一个 Owner 字段存放拥有该锁的线程的唯一标识(或者 object mark word),表示该锁被这个线程占用

Lock Record 描述
Owner 初始时为 NULL 表示当前没有任何线程拥有该 monitor record,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为 NULL
EntryQ 关联一个系统互斥锁(semaphore),阻塞所有试图锁住 monitor record 失败的线程
RcThis 表示 blocked 或 waiting 在该 monitor record 上的所有线程的个数
Nest 用来实现 重入锁的计数
HashCode 保存从对象头拷贝过来的HashCode值(可能还包含GC age)
Candidate 用来避免不必要的阻塞或等待线程唤醒,因为每一次只有一个线程能够成功拥有锁,如果每次前一个释放锁的线程唤醒所有正在阻塞或等待的线程,会引起不必要的上下文切换(从阻塞到就绪然后因为竞争锁失败又被阻塞)从而导致性能严重下降。Candidate只有两种可能的值 0 表示没有需要唤醒的线程 1 表示要唤醒一个继任线程来竞争锁

锁的优化

自 JDK5 引入了现代操作系统新增加的 CAS 后( JDK5 中并没有对 synchronized 关键字做优化,而是体现在 J.U.C 中,所以在该版本 concurrent 包有更好的性能 ),从 JDK6 开始,就对 synchronized 的实现机制进行了较大调整,包括使用 JDK5 引进的 CAS 之外,还增加了适应性自旋、锁消除、锁粗化、偏向锁、轻量级锁这些优化策略

  • 适应性自旋(Adaptive Spinning):当线程在获取轻量级锁的过程中执行 CAS 操作失败后,在进入与 monitor 相关联的操作系统重量级锁(mutex semaphore)前会进入自旋(Spinning)然后再次尝试,当尝试一定的次数后如果仍然没有成功则调用与该 monitor 关联的 semaphore(即互斥锁)进入到阻塞状态
  • 锁消除(Lock Elimination):通过运行时 JIT 编译器的逃逸分析来消除一些没有在当前同步块以外被其他线程共享的数据的锁保护,通过逃逸分析也可以在线程本的 Stack 上进行对象空间的分配(同时还可以减少Heap上的垃圾收集开销)
  • 锁粗化(Lock Coarsening):减少不必要的紧连在一起的 unlock,lock 操作,将多个连续的锁扩展成一个范围更大的锁
  • 偏向锁(Biased Locking):避免在锁获取过程中执行不必要的CAS原子指令(因为CAS原子指令虽然相对于重量级锁来说开销比较小但还是存在非常可观的本地延迟)
  • 轻量级锁(Lightweight Locking):这种锁实现的背后基于这样一种假设,即在真实的情况下我们程序中的大部分同步代码一般都处于无锁竞争状态(即单线程执行环境),在无锁竞争的情况下完全可以避免调用操作系统层面的重量级互斥锁,取而代之的是在 monitorenter 和 monitorexit 中只需要依靠一条 CAS 原子指令就可以完成锁的获取及释放。当存在锁竞争的情况下,执行 CAS 指令失败的线程将调用操作系统互斥锁进入到阻塞状态,当锁被释放的时候被唤醒

锁主要存在四种状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁。但是 锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级

在 JDK6 中默认是开启偏向锁和轻量级锁的,可以通过 -XX:-UseBiasedLocking 来禁用偏向锁

自旋锁

所谓自旋锁,就是指当一个线程尝试获取某个锁时,如果该锁已被其他线程占用,就 一直循环检测锁是否被释放,而不是进入线程挂起或睡眠状态

自旋锁 适用于锁保护的临界区很小 的情况,临界区很小的话,锁占用的时间就很短。自旋等待不能替代阻塞,虽然它可以避免线程切换带来的开销,但是它占用了 CPU 处理器的时间。如果持有锁的线程很快就释放了锁,那么自旋的效率就非常好,反之,自旋的线程就会白白消耗掉处理的资源,它不会做任何有意义的工作,典型的占着茅坑不拉屎,这样反而会带来性能上的浪费。所以说,自旋等待的时间(自旋的次数)必须要有一个限度,如果自旋超过了定义的时间仍然没有获取到锁,则应该被挂起

自旋锁在 JDK1.4.2 中引入,默认关闭,但是可以使用 -XX:+UseSpinning 开启,在 JDK1.6 中默认开启

同时自旋的默认次数为 10 次,可以通过参数 -XX:PreBlockSpin 来调整

适应性自旋锁

如果通过参数 -XX:PreBlockSpin 来调整自旋锁的自旋次数,会带来诸多不便。假如将参数调整为10,但是系统很多线程都是等你刚刚退出的时候就释放了锁(假如多自旋一两次就可以获取锁),是不是很尴尬。于是 JDK1.6 引入自适应的自旋锁,让 JVM 变得越来越聪明

所谓自适应就意味着自旋的次数不再是固定的,它是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。线程如果自旋成功了,那么下次自旋的次数会更加多,因为虚拟机认为既然上次成功了,那么此次自旋也很有可能会再次成功,那么它就会允许自旋等待持续的次数更多。反之,如果对于某个锁,很少有自旋能够成功,那么在以后要或者这个锁的时候自旋的次数会减少甚至省略掉自旋过程,以免浪费处理器资源

锁消除

在有些情况下,JVM 检测到不可能存在共享数据竞争,这是 JVM 会对这些同步锁进行锁消除

锁消除的依据是逃逸分析的数据支持

如果不存在竞争,为什么还需要加锁呢?所以锁消除可以节省毫无意义的请求锁的时间。变量是否逃逸,对于虚拟机来说需要使用数据流分析来确定,但是对于程序员来说这还不清楚么?在明明知道不存在数据竞争的代码块前加上同步吗?但是有时候程序并不是我们所想的那样?虽然没有显示使用锁,但是在使用一些 JDK 的内置 API 时,如 StringBuffer、Vector、HashTable 等,这个时候会存在隐形的加锁操作。比如 StringBuffer 的 append() 方法,Vector 的 add() 方法:

1
2
3
4
5
6
7
8
9
// 在运行这段代码时,JVM 可以明显检测到变量 vector 没有逃逸出方法 vectorTest() 之外
// 所以 JVM 可以大胆地将 vector 内部的加锁操作消除
public void vectorTest() {
Vector<String> vector = new Vector<>();
for(int i = 0 ; i < 10 ; i++) {
vector.add(i + "");
}
System.out.println(vector);
}

锁粗化

在使用同步锁的时候,需要让同步块的作用范围尽可能小—仅在共享数据的实际作用域中才进行同步,这样做的目的是 为了使需要同步的操作数量尽可能缩小,如果存在锁竞争,那么等待锁的线程也能尽快拿到锁

在大多数的情况下,上述观点是正确的。但是如果一系列的连续加锁解锁操作,可能会导致不必要的性能损耗,所以引入锁粗话的概念

锁粗话概念比较好理解,就是将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁

1
2
3
4
5
6
7
8
9
Vector<String> vector = new Vector<>();
public void vectorTest() {
// 这里 vector 每次循环 add 操作都需要加解锁
// JVM 可能会将加解锁操作移动到 for 循环之外,合并成一个更大范围的加解锁
for(int i = 0 ; i < 10 ; i++) {
vector.add(i + "");
}
System.out.println(vector);
}

偏向锁

偏向锁是 JDK6 中的重要引进,因为 HotSpot 作者经过研究实践发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低,引进了偏向锁

偏向锁是在单线程执行代码块时使用的机制,如果在多线程并发的环境下(即线程 A 尚未执行完同步代码块,线程 B 发起了申请锁的申请),则 一定会转化为轻量级锁或者重量级锁

在 JDK5 中偏向锁默认是关闭的,而到了 JDK6 中偏向锁已经默认开启。如果并发数较大同时同步代码块执行时间较长,则被多个线程同时访问的概率就很大,就可以使用参数 -XX:-UseBiasedLocking 来禁止偏向锁(但这是个JVM参数,不能针对某个对象锁来单独设置)

引入偏向锁主要目的是:为了在没有多线程竞争的情况下尽量 减少不必要的轻量级锁执行 路径。因为轻量级锁的加锁解锁操作是需要依赖多次 CAS 原子指令的,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗也必须小于节省下来的 CAS 原子指令的性能消耗)

轻量级锁是为了在线程交替执行同步块时提高性能,而偏向锁则是在只有一个线程执行同步块时进一步提高性能

那么偏向锁是如何来减少不必要的CAS操作呢?

现在几乎所有的锁都是可重入的,即已经获得锁的线程可以多次锁住/解锁监视对象,按照之前的 HotSpot 设计,每次加锁/解锁都会涉及到一些 CAS 操作(比如对等待队列的 CAS 操作),CAS 操作会延迟本地调用,因此偏向锁的想法是 一旦线程第一次获得了监视对象,之后让监视对象「偏向」这个线程,之后的多次调用则可以避免 CAS 操作,说白了就是置个变量,如果发现为 true 则无需再走各种加锁/解锁流程

CAS 为什么会引入本地延迟?这要从 SMP(对称多处理器)架构说起

SMP

所有的 CPU 会共享一条系统总线(BUS),靠此总线连接主存。每个核都有自己的一级缓存,各核相对于 BUS 对称分布,因此这种结构称为「对称多处理器」

而 CAS(Compare-And-Swap),是一条 CPU 的原子指令,其作用是让 CPU 比较然后原子地更新某个位置的值,经过调查发现,其实现方式是 基于硬件平台的汇编指令,就是说 CAS 是靠硬件实现的,JVM 只是封装了汇编调用,那些 AtomicInteger 类便是使用了这些封装后的接口

例如:Core1 和 Core2 可能会同时把主存中某个位置的值 Load 到自己的 L1 Cache 中,当 Core1 在自己的 L1 Cache 中修改这个位置的值时,会通过总线,使 Core2 中 L1 Cache 对应的值「失效」,而 Core2 一旦发现自己L1 Cache 中的值失效(称为 Cache 命中缺失)则会通过总线从内存中加载该地址最新的值,大家通过总线的来回通信称为「Cache 一致性流量」,因为总线被设计为固定的「通信能力」,如果 Cache 一致性流量过大,总线将成为瓶颈。而当 Core1 和 Core2 中的值再次一致时,称为「Cache 一致性」,从这个层面来说,锁设计的终极目标便是减少 Cache 一致性流量

Cache 一致性,其实是有协议支持的,现在通用的协议是 MESI(最早由Intel开始支持)

而 CAS 恰好会导致 Cache 一致性流量,如果有很多线程都共享同一个对象,当某个 Core CAS 成功时必然会引起总线风暴,这就是所谓的本地延迟,本质上偏向锁就是为了消除 CAS,降低 Cache 一致性流量

其实也不是所有的 CAS 都会导致总线风暴,这跟 Cache 一致性协议有关

与 SMP 对应还有 NUMA(非对称多处理器架构),现在主要应用在一些高端处理器上,主要特点是没有总线,没有公用主存,每个Core有自己的内存,针对这种结构此处不做讨论

所以,当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,以后该线程进入和退出同步块时不需要花费 CAS 操作来争夺锁资源,只需要检查是否为偏向锁、锁标识为以及 ThreadID 即可,处理流程如下:

  1. 检测 Mark Word 是否为可偏向状态,即是否为偏向锁 1,锁标识位为01;
  2. 若为可偏向状态,则测试线程 ID 是否为当前线程 ID,如果是,则执行步骤 ⑤,否则执行步骤 ③;
  3. 如果测试线程 ID 不为当前线程 ID,则通过 CAS 操作竞争锁,竞争成功,则将 Mark Word 的线程 ID 替换为当前线程 ID,否则执行线程 ④;
  4. 通过 CAS 竞争锁失败,证明当前存在多线程竞争情况,当到达全局安全点,获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码块;
  5. 执行同步代码块;

偏向锁的释放采用了一种 只有竞争才会释放锁 的机制,线程是不会主动去释放偏向锁,需要等待其他线程来竞争。偏向锁的撤销需要等待全局安全点(这个时间点是上没有正在执行的代码)。其步骤如下:

  1. 暂停拥有偏向锁的线程;
  2. 判断锁对象是否还处于被锁定状态,否,则恢复到无锁状态(01),以允许其余线程竞争。是,则挂起持有锁的当前线程,并将指向当前线程的锁记录地址的指针放入对象头 Mark Word,升级为轻量级锁状态(00),然后恢复持有锁的当前线程,进入轻量级锁的竞争模式;

注意:此处 当前线程挂起再恢复的过程中并没有发生锁的转移,仍然在当前线程手中,只是穿插了个「将对象头中的线程ID变更为指向锁记录地址的指针」这么个事

偏向锁的获得与撤销

轻量锁

引入轻量级锁的主要目的是 在没有多线程竞争的前提下,减少传统的重量级锁使用操作系统互斥量产生的性能消耗。当关闭偏向锁功能或者多个线程竞争偏向锁导致偏向锁升级为轻量级锁,则会尝试获取轻量级锁,其步骤如下:

  1. 在线程进入同步块时,如果同步对象锁状态为无锁状态(锁标志位为「01」状态,是否为偏向锁为「0」),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的 Mark Word 的拷贝,官方称之为 Displaced Mark Word

    轻量级锁 CAS 操作之前线程堆栈与对象的状态

  2. 拷贝对象头中的 Mark Word 复制到锁记录(Lock Record)中

  3. 拷贝成功后,虚拟机将使用 CAS 操作尝试将对象 Mark Word 中的 Lock Word 更新为指向当前线程 Lock Record 的指针,并将 Lock record 里的 owner 指针指向 object mark word。如果更新成功,则执行步骤 ④,否则执行步骤 ⑤

  4. 如果这个更新动作成功了,那么当前线程就拥有了该对象的锁,并且对象 Mark Word 的锁标志位设置为「00」,即表示此对象处于轻量级锁定状态,此时线程堆栈与对象头的状态如下图所示:

    轻量级锁 CAS 操作之后线程堆栈与对象的状态

  5. 如果这个更新操作失败了,虚拟机首先会检查对象 Mark Word 中的 Lock Word 是否指向当前线程的栈帧,如果是,就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,进入自旋执行 ③,若自旋结束时仍未获得锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为「10」,Mark Word 中存储的就是指向重量级锁(互斥量)的指针,当前线程以及后面等待锁的线程也要进入阻塞状态

轻量级锁的释放也是通过 CAS 操作来进行的,主要步骤如下:

  1. 通过 CAS 操作尝试把线程中复制的 Displaced Mark Word 对象替换当前的Mark Word
  2. 如果替换成功,整个同步过程就完成了,恢复到无锁状态(01)
  3. 如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程

对于轻量级锁,其性能提升的依据是「对于绝大部分的锁,在整个生命周期内都是不会存在竞争的」,如果打破这个依据则除了互斥的开销外,还有额外的 CAS 操作,因此在有多线程竞争的情况下,轻量级锁比重量级锁更慢

轻量级锁及膨胀过程

为什么升级为轻量锁时要把对象头里的Mark Word复制到线程栈的锁记录中呢

因为在申请对象锁时 需要以该值作为 CAS 的比较条件,同时在升级到重量级锁的时候,能通过这个比较判定是否在持有锁的过程中此锁被其他线程申请过,如果被其他线程申请了,则在释放锁的时候要唤醒被挂起的线程

为什么会尝试CAS不成功以及什么情况下会不成功

CAS 本身是不带锁机制的,其是通过比较而来。假设如下场景:线程 A 和线程 B 都在对象头里的锁标识为无锁状态进入,那么如线程 A 先更新对象头为其锁记录指针成功之后,线程 B 再用 CAS 去更新,就会发现此时的对象头已经不是其操作前的对象 HashCode 了,所以 CAS 会失败。也就是说,只有两个线程并发申请锁的时候会发生 CAS 失败

然后线程 B 进行 CAS 自旋,等待对象头的锁标识重新变回无锁状态或对象头内容等于对象 HashCode(因为这是线程 B 做 CAS 操作前的值),这也就意味着线程 A 执行结束(参见后面轻量级锁的撤销,只有线程 A 执行完毕撤销锁了才会重置对象头),此时线程 B 的 CAS 操作终于成功了,于是线程 B 获得了锁以及执行同步代码的权限。如果线程 A 的执行时间较长,线程 B 经过若干次 CAS 时钟没有成功,则锁膨胀为重量级锁,即线程 B 被挂起阻塞、等待重新调度

如何理解「轻量级」

「轻量级」是相对于使用操作系统互斥量来实现的传统锁而言的。但是,首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。轻量级锁所适应的场景是线程交替执行同步块的情况,如果存在多线程同一时间访问同一锁的情况,必然就会导致轻量级锁膨胀为重量级锁

重量级锁

Synchronized 是通过对象内部的一个叫做 监视器锁(Monitor)来实现的但是监视器锁本质又是依赖于底层的操作系统的 Mutex Lock 来实现的。而操作系统实现线程之间的切换这就需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么 Synchronized 效率低的原因。因此,这种依赖于操作系统 Mutex Lock 所实现的锁我们称之为「重量级锁」

重量级锁、轻量级锁、偏向锁之间的转换

Mark Word 变化

Synchronized 偏向锁、轻量级锁及重量级锁转换流程

锁的优劣

各种锁并不是相互代替的,而是在不同场景下的不同选择,绝对不是说重量级锁就是不合适的。每种锁是只能升级,不能降级,即由偏向锁->轻量级锁->重量级锁,而这个过程就是开销逐渐加大的过程

优点 缺点 适用场景
偏向锁 加解锁不需要额外的消耗,与执行非同步方法仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块的场景
轻量级锁 竞争的线程不会阻塞,体高了程序的响应速度 如果始终得不到锁竞争的线程会一直自旋消耗 CPU 追求响应时间,同步块执行速度非常快的场景
重量级锁 线程竞争不使用自旋,不会无效消耗 CPU 线程阻塞,响应时间变慢 最求吞吐量,同步块执行速度较慢的场景

偏向锁被废弃

在 JDK15 中,偏向锁被默认关闭,JDK18 中更被标记为 废弃,并不允许通过命令行手动开启

JEP 374: Deprecate and Disable Biased Locking (openjdk.org)

如今看来偏向锁的性能提升远不及过去那么明显。许多受益于偏向锁的程序都是较旧的遗留程序,它们使用早期的 Java 集合 API,它们在每次访问时进行同步(例如 HashtableVector )。对于较新的程序,针对单线程场景通常使用 Java 1.2 中引入的非同步集合(例如 HashMapArrayList ),针对多线程场景,则使用 Java 5 引入的性能更高的并发结构。这意味着,如果代码升级到这些较新的类,相比偏向锁,会有更大的性能提升。此外,围绕线程池队列和工作线程构建的应用程序通常在禁用偏向锁定的情况下性能更好。(例如,SPECjbb2015 就是这样设计的,而 SPECjvm98 和 SPECjbb2005 则不是)。
偏向锁定的代价是在发生锁争用时需要执行 昂贵的撤销操作。因此,受益于它的程序只是那些无竞争同步操作的程序。偏向锁的高效是假定在执行简单的锁检查加上偶尔昂贵的撤销成本,仍然低于执行 CAS 指令的成本。但 HotSpot 已经发生了很大的变化,原子指令成本的变化也改变了保持该关系所需的无竞争操作的数量。
另一个值得注意的方面是,当同步操作上花费的时间只占程序总工作负载的一小部分时,即使先前的成本关系成立,程序也不会从偏向锁中获得明显的性能改进。
偏向锁定在同步子系统中引入了大量复杂的代码,并且还会侵入其他 HotSpot 组件。这种复杂性是理解代码各个部分的障碍,也是在同步子系统内进行重大设计更改的障碍。为此,我们希望禁用、弃用并最终删除对偏向锁定的支持

总结来说,偏向锁被弃用的原因主要是一下几个方面:

  • 复杂性:偏向锁显著地增加了 HotSport 同步锁组件的复杂性,使得维护和优化变得困难
  • 性能提升的局限性:虽然偏向锁在某些情况下可以提高性能,但也可能导致其他情况下性能的下降(锁撤销)
  • 性能提升的有限性:偏向锁的出现是为了避免 CAS 的长时间自旋,而现代 CPU 在处理原子操作方面性能已经得到了很大的提升
  • 使用场景较少:随着应用程序越来越多地转向并发或并行编程模型(例如,利用多核处理器和使用并发数据结构),使偏向锁定有效的假设不太适用。更多对象在多个线程之间共享,从而降低了偏向锁对单个线程的有效性

缺点

  1. 阻塞时长的控制

    当一个线程获取得了锁,其他线程便只能一直等待,等待获取锁的线程释放锁。如果这个线程由于 IO 或其他原因(sleep)被阻塞了,那么其他得线程只能无限期的等下去

  2. 锁的颗粒度不够细

    如果多个线程同时读一个文件时,不管有没有发生写操作,都会导致阻塞,没法做到读与写的区别处理

  3. 无法指定公平与非公平

  4. 不支持条件变量

  5. 只能锁定单个对象

Monitor

任何一个对象都有一个 Monitor 与之关联,当且一个 Monitor 被持有后,它将处于锁定状态。Synchronized 在 JVM 里的实现都是基于进入和退出 Monitor 对象来实现方法同步和代码块同步,虽然具体实现细节不一样,但是都可以通过成对的 MonitorEnter 和 MonitorExit 指令来实现

在 HotSport 中,Monitor 是由 ObjectMonitor 实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// initialize the monitor, exception the semaphore, all other fields
// are simple integers or pointers
ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL; // 等待唤醒的线程集合(WAITING)
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; // 被阻塞进入 monitor 的线程集合(BLOCKING)
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}

monitorenter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
void TemplateTable::monitorenter() {
//校验当前指令的栈顶缓存类型是否正确
transition(atos, vtos);

//校验rax中值是否为空,栈顶缓存就保存在rax寄存器中,如果为NULL会触发底层操作系统的NULL异常
//此时rax中保存的是用于获取锁的实例oop
__ null_check(rax);

const Address monitor_block_top(
rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize);
const Address monitor_block_bot(
rbp, frame::interpreter_frame_initial_sp_offset * wordSize);
const int entry_size = frame::interpreter_frame_monitor_size() * wordSize;

Label allocated;

//xorl用于按位异或,相同的位置为0,不同的位置为1,此处是将c_rarg1置为NULL
__ xorl(c_rarg1, c_rarg1); // points to free slot or NULL

//找到一个空闲的monitor_block,结果保存在c_rarg1中
{
Label entry, loop, exit;
//将monitor_block_top拷贝到c_rarg3中
__ movptr(c_rarg3, monitor_block_top); // points to current entry,
// starting with top-most entry
//将monitor_block_bot拷贝到c_rarg2
__ lea(c_rarg2, monitor_block_bot); // points to word before bottom
// of monitor block
//跳转到entry标签处执行
__ jmpb(entry);

__ bind(loop);
//判断c_rarg3指向的BasicObjectLock的obj属性是否为空,如果为空表示未使用
__ cmpptr(Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes()), (int32_t) NULL_WORD);
//如果相等,即BasicObjectLock的obj属性为空,则将c_rarg3的值拷贝到c_rarg1
__ cmov(Assembler::equal, c_rarg1, c_rarg3);
// 判断c_rarg3指向的BasicObjectLock的obj属性与rax中实例是否一致
__ cmpptr(rax, Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes()));
// 如果一致则退出,一致说明BasicObjectLock的obj属性不为空,此时c_rarg1为空,就是重新分配一个新的
__ jccb(Assembler::equal, exit);
// 如果不一致则把c_rarg3地址加上entry_size,即开始遍历前面一个monitor_block,即存在空闲的,但是没有obj属性相同的时候会把所有的
//BasicObjectLock都遍历一遍,找到最上面的地址最大一个空闲的BasicObjectLock
__ addptr(c_rarg3, entry_size);
__ bind(entry);
//判断两个寄存器的值是否相等
__ cmpptr(c_rarg3, c_rarg2);
//如果不等于则跳转到loop标签,否则跳转到exit
__ jcc(Assembler::notEqual, loop);
__ bind(exit);
}

//判断c_rarg1是否为空,如果不为空则跳转到allocated处
__ testptr(c_rarg1, c_rarg1); // check if a slot has been found
__ jcc(Assembler::notZero, allocated); // if found, continue with that one

//如果没有找到空闲的monitor_block则分配一个
{
Label entry, loop;
// 将monitor_block_bot拷贝到c_rarg1 // rsp: old expression stack top
__ movptr(c_rarg1, monitor_block_bot); // c_rarg1: old expression stack bottom
//向下(低地址端)移动rsp指针entry_size字节
__ subptr(rsp, entry_size); // move expression stack top
//将c_rarg1减去entry_size
__ subptr(c_rarg1, entry_size); // move expression stack bottom
//将rsp拷贝到c_rarg3
__ mov(c_rarg3, rsp); // set start value for copy loop
//将c_rarg1中的值写入到monitor_block_bot
__ movptr(monitor_block_bot, c_rarg1); // set new monitor block bottom
//跳转到entry处开始循环
__ jmp(entry);
// 2.移动monitor_block_bot到栈顶的数据,将从栈顶分配的一个monitor_block插入到原来的monitor_block_bot下面
__ bind(loop);
//将c_rarg3之后的entry_size处的地址拷贝到c_rarg2,即原来的rsp地址
__ movptr(c_rarg2, Address(c_rarg3, entry_size)); // load expression stack
// word from old location
//将c_rarg2中的数据拷贝到c_rarg3处,即新的rsp地址
__ movptr(Address(c_rarg3, 0), c_rarg2); // and store it at new location
//c_rarg3加上一个字宽,即准备复制下一个字宽的数据
__ addptr(c_rarg3, wordSize); // advance to next word
__ bind(entry);
//比较两个寄存器的值
__ cmpptr(c_rarg3, c_rarg1); // check if bottom reached
//如果不等于则跳转到loop
__ jcc(Assembler::notEqual, loop); // if not at bottom then
// copy next word
}

// call run-time routine
// c_rarg1: points to monitor entry
__ bind(allocated);

//增加r13,使其指向下一个字节码指令
__ increment(r13);

//将rax中保存的获取锁的oop保存到c_rarg1指向的BasicObjectLock的obj属性中
__ movptr(Address(c_rarg1, BasicObjectLock::obj_offset_in_bytes()), rax);
//获取锁
__ lock_object(c_rarg1);

//保存bcp,为了出现异常时能够返回到原来的执行位置
__ save_bcp(); // in case of exception
__ generate_stack_overflow_check(0);

//恢复字节码指令的正常执行
//因为上面已经增加r13了,所以此处dispatch_next的第二个参数使用默认值0,即执行r13指向的字节码指令即可,不用跳转到下一个指令
__ dispatch_next(vtos);
}

void InterpreterMacroAssembler::lock_object(Register lock_reg) {
assert(lock_reg == c_rarg1, "The argument is only for looks. It must be c_rarg1");

//UseHeavyMonitors表示是否只使用重量级锁,默认为false,如果为true则调用InterpreterRuntime::monitorenter方法获取重量级锁
if (UseHeavyMonitors) {
call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
lock_reg);
} else {
Label done;

const Register swap_reg = rax; // Must use rax for cmpxchg instruction
const Register obj_reg = c_rarg3; // Will contain the oop

const int obj_offset = BasicObjectLock::obj_offset_in_bytes();
const int lock_offset = BasicObjectLock::lock_offset_in_bytes ();
const int mark_offset = lock_offset +
BasicLock::displaced_header_offset_in_bytes();

Label slow_case;

//进入此方法目标obj要么是无锁状态,要么是对同一个对象的synchronized嵌套情形下的有锁状态
//将用于获取锁的实例oop拷贝到obj_reg中
movptr(obj_reg, Address(lock_reg, obj_offset));

//UseBiasedLocking默认为true
if (UseBiasedLocking) {
//首先尝试获取偏向锁,获取成功会跳转到done,否则走到slow_case
biased_locking_enter(lock_reg, obj_reg, swap_reg, rscratch1, false, done, &slow_case);
}

//如果UseBiasedLocking为false或者目标对象的锁不是偏向锁了会走此逻辑
movl(swap_reg, 1);

//计算 object->mark() | 1,结果保存到swap_reg,跟1做或运算将其标记为无锁状态
orptr(swap_reg, Address(obj_reg, 0));

//将(object->mark() | 1)的结果保存到BasicLock的displaced_header中,保存原来的对象头
movptr(Address(lock_reg, mark_offset), swap_reg);

//lock_reg即是里面的lock属性的地址
assert(lock_offset == 0,
"displached header must be first word in BasicObjectLock");

if (os::is_MP()) lock(); //如果是多核系统,通过lock指令保证cmpxchgp的操作是原子的,即只可能有一个线程操作obj对象头
//将obj的对象头同rax即swap_reg比较,如果相等将lock_reg写入obj对象头,即lock属性写入obj对象头,如果不等于则将obj对象头放入rax中
cmpxchgptr(lock_reg, Address(obj_reg, 0));
if (PrintBiasedLockingStatistics) {
//增加计数器
cond_inc32(Assembler::zero,
ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr()));
}
//如果等于,说明obj的对象头是无锁状态的,此时跟1做或运算,结果不变
jcc(Assembler::zero, done);

//如果不等于,说明obj的对象头要么是偏向锁,要么是重量级锁,多线程下可能其他线程已经获取了该对象的轻量级锁
//下面的汇编指令相当于执行如下判断,判断目标对应的对象头是否属于当前调用栈帧,如果是说明还是当前线程占有该轻量级锁,如果不是则说明其他线程占用了轻量级锁或者已经膨胀成重量级锁
// 1) (mark & 7) == 0, and
// 2) rsp <= mark < mark + os::pagesize()
subptr(swap_reg, rsp);
andptr(swap_reg, 7 - os::vm_page_size());

//在递归即synchronized嵌套使用的情形下,上述指令计算的结果就是0
//当BasicLock的displaced_header置为NULL
movptr(Address(lock_reg, mark_offset), swap_reg);
if (PrintBiasedLockingStatistics) {
//增加计数器
cond_inc32(Assembler::zero,
ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr()));
}
//如果andptr的结果为0,说明当前线程已经获取了轻量级锁则跳转到done
jcc(Assembler::zero, done);
//否则执行InterpreterRuntime::monitorenter将轻量级锁膨胀成重量级锁或者获取重量级锁
bind(slow_case);

// Call the runtime routine for slow case
call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
lock_reg);

bind(done);
}
}

BasicObjectLock 用于将某个特定的 java 对象与 BasicLock 关联起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// A BasicObjectLock associates a specific Java object with a BasicLock.
// It is currently embedded in an interpreter frame.

// Because some machines have alignment restrictions on the control stack,
// the actual space allocated by the interpreter may include padding words
// after the end of the BasicObjectLock. Also, in order to guarantee
// alignment of the embedded BasicLock objects on such machines, we
// put the embedded BasicLock at the beginning of the struct.
class BasicObjectLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
BasicLock _lock; // the lock, must be double word aligned
oop _obj; // object holds the lock;

public:
// Manipulation
oop obj() const { return _obj; }
void set_obj(oop obj) { _obj = obj; }
BasicLock* lock() { return &_lock; }

// Note: Use frame::interpreter_frame_monitor_size() for the size of BasicObjectLocks
// in interpreter activation frames since it includes machine-specific padding.
static int size() { return sizeof(BasicObjectLock)/wordSize; }

// GC support
void oops_do(OopClosure* f) { f->do_oop(&_obj); }

static int obj_offset_in_bytes() { return offset_of(BasicObjectLock, _obj); }
static int lock_offset_in_bytes() { return offset_of(BasicObjectLock, _lock); }
};

BasicLock 主要用来保存对象的对象头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class BasicLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
volatile markOop _displaced_header;
public:
markOop displaced_header() const { return _displaced_header; }
void set_displaced_header(markOop header) { _displaced_header = header; }

void print_on(outputStream* st) const;

// move a basic lock (used during deoptimization
void move_to(oop obj, BasicLock* dest);

static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); }
};

BasicObjectLock 是内嵌在线程的调用栈帧中的,有一段连续的内存区域用来保存多个 BasicObjectLock,这个内存区域的起始位置是保存在栈帧中的特定偏移处的,终止地址(低地址端)保存 interpreter_frame_monitor_block_top_offset 偏移处,起始地址(高地址端)保存在 interpreter_frame_initial_sp_offset 偏移处。monitorenter 方法会遍历用于分配 BasicObjectLock 的连续内存区域,注意是从低地址端往高地址端遍历,即按照 BasicObjectLock 的分配顺序倒序遍历,先遍历最近分配的 BasicObjectLock。遍历过程中,如果找到一个 obj 属性就是目标对象的 BasicObjectLock,则停止遍历重新分配一个新的 BasicObjectLock;如果没有找到 obj 属性相同的且没有空闲的,同样重新分配一个新的 BasicObjectLock;如果没有 obj 属性相同的且有空闲的,则遍历完所有的 BasicObjectLock,找到最上面的地址最高的一个空闲 BasicObjectLock,总而言之就是要确保新分配的 BasicObjectLock 一定要在之前分配的 obj 属性是同一个对象的 BasicObjectLock 的后面,因为解锁时也是倒序查找的,找到一个 obj 属性相同的视为查找成功。获取 BasicObjectLock 后就将当前对象保存进 obj 属性,然后调用 lock_object 获取锁。在默认开启 UseBiasedLocking 时,lock_object 会先获取偏向锁,如果已经获取了则升级成轻量级锁,如果已经获取了轻量级锁则升级成重量级锁

InterpreterRuntime::monitorenter 用于获取轻量级锁或者重量级锁,获取轻量级锁成功后会将目标对象的对象头改成 BasicLock 指针,获取重量级锁成功后会将目标对象的对象头改成 ObjectMonitor 指针,BasicLock 和 ObjectMonitor 本身会保存目标对象原来的无锁状态下的对象头,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))

if (PrintBiasedLockingStatistics) {
//增加计数器
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
//获取关联的对象
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
//如果使用偏向锁,走快速enter
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
IRT_END

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
//如果不是在安全点上,则撤销偏向锁,特殊情形下会重新获取偏向锁
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
//获取成功直接返回
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
//如果在安全点下,撤销偏向锁
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}

slow_enter (obj, lock, THREAD) ;
}

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");

if (mark->is_neutral()) {
//未持有锁,保存原来的对象头
lock->set_displaced_header(mark);
//将lock作为原来的对象头,因为BasicLock本身的大小就是8字节的,所以lock地址的后3位都是0,0就表示轻量级锁
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
//如果设置成功表示已经获取轻量级锁,返回
return ;
}
//修改失败,说明有其他线程获取了同一个对象的轻量级锁,则需要将其膨胀成重量级锁
} else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
//如果该对象已经持有轻量级锁且对象头中包含的指针属于当前线程所有,即是当前线程持有了该对象的轻量级锁
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
//已经分配了对应的BasicLock,这个BasicLock没有用了
lock->set_displaced_header(NULL);
return;
}


//因为ObjectMonitor中会保存对象的对象头,所以此处不需要在保存了
lock->set_displaced_header(markOopDesc::unused_mark());
//锁膨胀转换成重量级锁
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

bool Thread::is_lock_owned(address adr) const {
return on_local_stack(adr);
}

BasicLock* locker() const {
assert(has_locker(), "check");
return (BasicLock*) value();
}

对比 lock_obj 方法可知,两者获取轻量级锁或者处理锁嵌套情形时的代码是一样的,monitorenter 方法增加了一步撤销偏向锁和轻量级锁膨胀成重量级锁的逻辑,这是为了兼容 UseHeavyMonitors 参数为 true 或者 UseBiasedLocking 为 false 时的处理逻辑

monitorexit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
void TemplateTable::monitorexit() {
//检查栈顶缓存的类型是否正确
transition(atos, vtos);

//检查rax包含的跟锁关联的对象oop是否为空
__ null_check(rax);

const Address monitor_block_top(
rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize);
const Address monitor_block_bot(
rbp, frame::interpreter_frame_initial_sp_offset * wordSize);
const int entry_size = frame::interpreter_frame_monitor_size() * wordSize;

Label found;

// find matching slot
{
Label entry, loop;
//把monitor_block_top拷贝到c_rarg1
__ movptr(c_rarg1, monitor_block_top); // points to current entry,
// starting with top-most entry
//把monitor_block_bot拷贝到c_rarg2
__ lea(c_rarg2, monitor_block_bot); // points to word before bottom
// of monitor block
__ jmpb(entry);

__ bind(loop);
//比较rax中对象oop与obj属性是否一致
__ cmpptr(rax, Address(c_rarg1, BasicObjectLock::obj_offset_in_bytes()));
//如果一致则表示找到了跳转到found
__ jcc(Assembler::equal, found);
//如果没有找到则增加entry_size,即开始遍历前面一个BasicObjectLock
__ addptr(c_rarg1, entry_size);
__ bind(entry);
//比较这两个是否相等,如果相等表示遍历完成
__ cmpptr(c_rarg1, c_rarg2);
//如果不等则跳转到loop标签
__ jcc(Assembler::notEqual, loop);
}

//没有在当前线程的栈帧中找到关联的BasicObjectLock,抛出异常
__ call_VM(noreg, CAST_FROM_FN_PTR(address,
InterpreterRuntime::throw_illegal_monitor_state_exception));
__ should_not_reach_here();

// call run-time routine
// rsi: points to monitor entry
__ bind(found);
//将这个锁对象放入栈帧中
__ push_ptr(rax); // make sure object is on stack (contract with oopMaps)
//执行解锁逻辑
__ unlock_object(c_rarg1);
//从栈帧中弹出锁对象
__ pop_ptr(rax); // discard object
}

void InterpreterMacroAssembler::unlock_object(Register lock_reg) {
assert(lock_reg == c_rarg1, "The argument is only for looks. It must be rarg1");

if (UseHeavyMonitors) {
//如果只使用重量级锁,UseHeavyMonitors默认为false
call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorexit),
lock_reg);
} else {
Label done;

const Register swap_reg = rax; // Must use rax for cmpxchg instruction
const Register header_reg = c_rarg2; // Will contain the old oopMark
const Register obj_reg = c_rarg3; // Will contain the oop

save_bcp(); //保存bcp,方便解锁异常时回滚

//将lock属性的地址复制到swap_reg
lea(swap_reg, Address(lock_reg, BasicObjectLock::lock_offset_in_bytes()));

//将obj属性复制到obj_reg
movptr(obj_reg, Address(lock_reg, BasicObjectLock::obj_offset_in_bytes()));

//将obj属性置为NULL
movptr(Address(lock_reg, BasicObjectLock::obj_offset_in_bytes()), (int32_t)NULL_WORD);

if (UseBiasedLocking) {
//如果持有偏向锁,则解锁完成后跳转到done
biased_locking_exit(obj_reg, header_reg, done);
}

//将BasicLock的displaced_header属性复制到header_reg中,即该对象原来的对象头
movptr(header_reg, Address(swap_reg,
BasicLock::displaced_header_offset_in_bytes()));

//判断这个是否为空
testptr(header_reg, header_reg);

//如果为空说明这是对同一目标对象的synchronized嵌套情形,则跳转到done,等到外层的synchronized解锁恢复目标对象的对象头
jcc(Assembler::zero, done);

// Atomic swap back the old header
if (os::is_MP()) lock();//如果是多核系统则通过lock指令前缀将cmpxchg变成一个原子操作,即只能有一个线程同时操作obj的对象头
//将obj的对象头同rax即swap_reg,即lock属性地址比较,如果相等把header_reg写入到obj的对象头中即恢复对象头,如果不等把obj对象头写入rax中
cmpxchgptr(header_reg, Address(obj_reg, 0));

//如果相等,说明还是轻量级锁,解锁完成
jcc(Assembler::zero, done);

//如果不等于,说明轻量级锁被膨胀成重量级锁,恢复obj属性,因为上面将该属性置为NULL
movptr(Address(lock_reg, BasicObjectLock::obj_offset_in_bytes()),
obj_reg); // restore obj
//调用InterpreterRuntime::monitorexit,完成重量级锁退出
call_VM(noreg,
CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorexit),
lock_reg);

bind(done);
//恢复bcp
restore_bcp();
}
}

//偏向锁的解锁只是判断目标对象是否持有偏向锁,如果持有就跳转到done,没有实际的解锁动作
void MacroAssembler::biased_locking_exit(Register obj_reg, Register temp_reg, Label& done) {
assert(UseBiasedLocking, "why call this otherwise?");
//将obj的对象头拷贝到temp_reg
movptr(temp_reg, Address(obj_reg, oopDesc::mark_offset_in_bytes()));
//将对象头指针同biased_lock_mask_in_place求且
andptr(temp_reg, markOopDesc::biased_lock_mask_in_place);
//判断且运算后的结果是否是5
cmpptr(temp_reg, markOopDesc::biased_lock_pattern);
//如果相等则跳转到done
jcc(Assembler::equal, done);
}

monitorexit 用于轻量级锁和重量级锁的释放,锁释放就是目标对象的对象头恢复成无锁状态,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))

Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
//如果BasicObjectLock为空或者目标对象没有持有锁则抛出异常
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
//将obj置为NULL
elem->set_obj(NULL);

IRT_END

inline bool oopDesc::is_unlocked() const {
return mark()->is_unlocked();
}

bool is_unlocked() const {
return (mask_bits(value(), biased_lock_mask_in_place) == unlocked_value);
}

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}


void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {
//对同一目标对象的synchronized嵌套情形,该对象的原来的对象头保存在外层synchronized对应的BasicLock或者ObjectMonitor中,外层synchronized解锁时会恢复其对象头
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
//如果持有轻量级锁
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
//如果持有重量级锁
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}

mark = object->mark() ;

if (mark == (markOop) lock) {
//持有轻量级锁,dhw保存着原来的对象头
assert (dhw->is_neutral(), "invariant") ;
//恢复对象的对象头
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
//此时持有该对象轻量级锁的只有当前线程,所以原子修改不会失败
}
//如果持有重量级锁,则重量锁解锁
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}

Thanks