简单示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| class CountDownLatchTest {
private static final CountDownLatch startSignal = new CountDownLatch(1); private static final CountDownLatch doneSignal = new CountDownLatch(10); private static final Random random = new Random();
public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new Thread(new Worker(i + 1)).start(); } TimeUnit.SECONDS.sleep(1); startSignal.countDown(); doneSignal.await(); System.out.println("Done"); }
private static class Worker implements Runnable { private final int i;
public Worker(int i) { this.i = i; }
@Override public void run() { try { startSignal.await(); } catch (InterruptedException e) { e.printStackTrace(); } long start = System.currentTimeMillis(); System.out.println("Worker-" + i + " started"); try { TimeUnit.SECONDS.sleep(random.nextInt(5) + 1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Worker-" + i + " done! cost " + (System.currentTimeMillis() - start) + "ms"); doneSignal.countDown(); } } }
|
这里通过两个 CountDownLatch 实现了多线程的同步启动以及完全结束的控制
这里再次注意:初始化线程时,不要用 Worker::new
new Thread(Worker::new).start 等同于:
1 2 3 4 5 6
| new Thread(new Runnable() { @Override public void run() { new Worker(); } }).start();
|
源码分析
相较于 ReentranLock 和 ReentranReadWriteLock,CountDownLatch 的实现算是「十分简单啦」
CountDownLatch 中只有一个变量
和 ReentrantLock 它们类似,这个 sync 也是一个继承了 AQS 的内部类
构造函数中会初始化这个 sync
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
先看下 CountDownLatch 对外暴露的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public void countDown() { sync.releaseShared(1); }
public long getCount() { return sync.getCount(); }
public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }
|
所有的实现都是通过 Sync 实现的,所以直接看 Sync 就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
|
这个 Sync 内部类的逻辑也是很简单的,整体 CountDownLatch 的实现还需结合 AQS 来看
初始化
首先在 CountDownLatch 的构造函数中初始化 Sync
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
然后在内部类 Sync 的构造函数中初始化 AQS 中的状态
1 2 3 4
| Sync(int count) { setState(count); }
|
通过给 state 设置值,也是设置了 Sync 共享锁的重入次数(计数器次数)
await
CountDownLatch 中调用 await 方法
1 2 3
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
|
这个 acquireSharedInterruptibly
方法并未在 CountDownLatch.Sync 中重写,直接使用的是 AQS 的
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
countDown
CountDownLatch 中调用 countDown 方法
1 2 3 4
| public void countDown() { sync.releaseShared(1); }
|
进入到 AQS 的 releaseShared 方法
1 2 3 4 5 6 7 8 9 10
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|