protectedfinalbooleancompareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// 在队列中增加一个节点,即新增一个等待这个锁的线程 private Node addWaiter(Node mode) { Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred= tail; // 如果队尾不为null就去将node添加到队尾 if (pred != null) { node.prev = pred; // 如果CAS失败,进入下面的enq,enq用循环保证添加队尾操作会成功 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // 取消状态,不需要被唤醒 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 将被取消的节点从链表中剔除 do { /* 相当于 * pred = pred.prev; * node.prev = pred; */ node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // 当前节点node前面的一个或多个节点被取消,剔除掉这些被取消的节点以后,让方法返回false // 让外层调用继续在循环检测是否能获取锁,外层是指acquireQueued方法 } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; if (s == null || s.waitStatus > 0) { s = null; for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // unpark头结点的下一个节点,即唤醒下一个等待中的线程 // unpark调用会让上面的parkAndCheckInterrupt方法返回false,具体见LockSupport类 if (s != null) LockSupport.unpark(s.thread); }
如果有 N 个线程在等待锁,队列中就会有 N + 1 个节点,首个节点是当前获得了锁的线程。每个等待的节点都在 acquireQueued 方法的大循环中,被 parkAndCheckInterrupt 方法阻塞。当前获取锁的线程一旦释放锁,就会去唤醒他的下一个节点(下一个等待线程)。等待线程被唤醒以后又开始执行 acquireQueued 方法的循环,重新检查「自己前面的节点是不是首个节点,并且是不是能够获取锁」,如果通过检查就自己成为头部,并且方法返回,获得锁。
ReentrantLock 的公平锁
公平锁和非公平锁,只在 tryAcquire 不一样。acquire 是 AQS 的方法。
1 2 3 4 5
publicfinalvoidacquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }