/** * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalintREINTERRUPT=1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalintTHROW_IE= -1;
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownewInterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
/** * 可中断的条件等待 */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); // 新增等待节点 Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); intinterruptMode=0; // 循环阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * 指定等待时间的等待 */ publicfinallongawaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) // 本质是通过 parkNanos 方法实现 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
/** * 等待直到指定的时间 */ publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { longabstime= deadline.getTime(); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * 等待指定的时间 */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { longnanosTimeout= unit.toNanos(time); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
// support for instrumentation
/** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ finalbooleanisOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; }
/** * 查询是否有正在等待此条件的任何线程 */ protectedfinalbooleanhasWaiters() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) returntrue; } returnfalse; }
/** * 返回正在等待此条件的线程数估计值 */ protectedfinalintgetWaitQueueLength() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); intn=0; for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
/** * 返回包含那些可能正在等待此条件的线程集合 */ protectedfinal Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); ArrayList<Thread> list = newArrayList<Thread>(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Threadt= w.thread; if (t != null) list.add(t); } } return list; } }
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; // 前节点状态为 -1,说明前节点状态正常,当前线程需要挂起,可以直接返回 true if (ws == Node.SIGNAL) /** * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // 前节点状态 > 0,说明前节点取消了排队 // 这里需要知道的是:进入阻塞队列的线程会被挂起,而唤醒操作是需要前节点来完成的 // 所以这里将当前节点指向「前前节点」 if (ws > 0) { /** * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // 前节点状态可能是 0,-2,-3 // 前面的方法中都没有对当前节点的状态进行更新,所以当前节点的状态应该是默认的 0 // 所以这里将前节点的状态设置为 -1(返回 false 后会再度进到这个方法,然后进入第一个 if 分支) } else { /** * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) // 如果节点状态小于 0,说明处于等待或阻塞状态,这时就直接通过 CAS 将节点状态置为初始状态 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 下面就是唤醒后继节点 // 有可能后继节点取消了等待(waitStatus = 1) // 从队列尾往前找,找到 waitStatus <= 0 的就排到最前面 Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 到这里说明节点状态是 0,直接唤醒就可以了 if (s != null) LockSupport.unpark(s.thread); }