先看ReentrantLock类的构造方法和成员
private final Sync sync;public ReentrantLock() {sync = new NonfairSync();
}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
让人惊讶的是,该类的实现在这一层显得相当简洁,往下面看我们会发现几乎所有操作都是在调用该类的内部类Sync的方法
public void lock() {sync.lock();
}
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
public boolean tryLock() {return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {sync.release(1);
}
有构造方法可知,根据参数fair是否是boolean来选择FairSync或者NonFairSync的实现(公平锁与非公平锁)。
我们可以先来看AbstractQueuedSynchronizer的实现。
AbstractQueuedSynchronizer类实现的是一个FIFO的队列。下面是AbstractQueuedSynchronizer的成员。
private transient volatile Node head;private transient volatile Node tail;private volatile int state;
state代表该锁的占用情况,如果为0则代表当前锁并没有被任何线程调用。
head,tail分别指向队列的头节点和尾节点,队列中的具体线程通过内部类Node类来存储,下面是Node类的成员。
static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;
SHARED EXCLUSIVE这两个成员分别表示该Node处于共享模式还是独占模式。
接下来几个int类型的量分别表示该Node处于的状态,可以结合注释一一解释。
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
SIGNAL状态表示,该节点的继任节点已经处于阻塞状态,而此时,当现在的节点在被释放或者取消的时候需要唤醒该节点的后继结点。
CANCELLED状态表示该节点已经由于超时或者是打断已经被取消,一旦有节点进入这个阶段,就再也不会转变状态,处于这个状态的节点不会再被阻塞(该状态是唯一大于0的状态量)。
CONDITION状态表示该节点已经处于一个条件队列当中,该节点在转变之前不能在视为同步队列上的节点,如果被转换时,节点状态会变成0(节点的初始状态)。
PROPAGATE状态表示在共享状态下的节点的下一个节点获取锁的时候可以保证锁的取得,及时有阻塞的发生。
0状态,刚生成的新节点处于的状态。
接下来则是该节点所代表的线程,以及节点在队列上的前驱后继。
nextWaiter表示该节点在条件队列上的后继节点。
值得一提的是,为了线程安全,AbstractQueuedSynchronizer类内部采用了unsafe类来保证成员在高并发条件下读写的准确性。(线程的挂起与恢复也都是依赖于unsafe类)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }
}
现在可以在ReentrantLock类实现公平锁的前提下,试着从外部调用lock()方法的时候,看看到底发生了什么事情。
public void lock() {sync.lock();
}
在外部调用lock()方法的时候,会直接调用内部类的lock()方法。
final void lock() {acquire(1);
}
而在内部FairSync中,调用了lock()方法的时候,会直接调用AbstractQueuedSynchronizer类的acquire()方法。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
首先调用了tryAcquire()方法,但在AbstractQueuedSynchronizer类中并没有具体实现,真正的具体实现在FairSync类中。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
在这个方法中首先判断当前锁是否已经被别的线程占用(也就是state为0),如果为0,还需要进一步调用hasQueuedPredecssors()()来确认在同步队列中已经没有节点正在等待。
public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
在这之后通过cas给锁的state加上所传进入的值,以确保表示该锁已经被占用,之后设置当前线程为占用锁的线程。
如果在之前state已经不为0,所要在判断一下该线程是否已经是取得所得线程,如果是,给state加上所传入的数字(记得小于Integer.MAX_VALUE),在这之后也算取得锁了。
否则取得锁的行为算是失败。既然如此,我们需要将线程作为节点存入队列中了。
首先通过addWaiter()方法生成Node(独占模式)加入队列。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
以上是加入队列的方法,只是涉及简单的链表以及FIFO操作,暂不赘述。
在成功加入队列后,就会调用acquireQueued()方法来试图在队列中取得锁。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; failed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
在该方法的循环块可以看到,如果当前当前节点的前驱就是头结点同时在之前的tryAquire()方法中成功获得了锁,那么当前节点也顺利获得了锁。如果当前节点并不是头结点的下一个节点或者在之前取的锁失败,则会调用shouldParkAfterFailedAquire()方法判断这个节点队列上下一个将会执行到的节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
如果该节点的前驱节点的状态正好是SIGNAL,那么根据刚才状态的解释,这里的线程符合需要被挂起来的条件。如果不是呢,在下面还需要判断,如果前驱节点已经被处于取消状态,那么就需要不断往该节点的前驱往之前的节点直到找到不是处于取消状态的节点。如果不是上面的两种状态得到节点,那么将前驱节点的状态改为SIGNAL,在下一次循环中试图将线程挂起。
如果符合到了线程挂起的条件,那么则通过LockSupport类调用unsafe类的方法将线程挂起。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);unsafe.park(false, 0L);setBlocker(t, null);
}
unsafe类通过本地方法完成线程阻塞的目的。
如果在上面的调用acquireQueued()方法出现了异常,则会在canselAquire()方法中取消尝试获得锁 的操作。
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;node.waitStatus = Node.CANCELLED;if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node;}
}
在取消的过程中,首先将所要取消的Node类中的线程置为 Null,之后不断在队列中依次从前驱节点中找到一个不是取消状态的节点作为当前节点的前驱,并将取消的节点从队列中清除。之后将该节点的状态置为取消。接下来,如果要取消的节点已经为尾节点,则将该节点的前驱置为尾节点。如果在该节点的前驱不是头结点,则将其前驱置为SIGNAL状态,并将要取消的节点的后继赋给前驱的后继。
如果要取消的节点已经是头结点了,或者在给前驱设置SIGNAL状态失败,则需要调用unparkSuccessor()方法来恢复已经处于挂起状态的后继结点。
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
在这个方法中从如果所需要唤醒的节点为空,或者已经为取消状态,则需要从尾节点到当前节点选择离当前节点最近的没有被取消的节点。
最后通过LockSupport来恢复被挂起的线程。
加锁的过程如上所示,下面从外部调用unlock()方法解锁开始解释。
public void unlock() {sync.release(1);
}
直接调用内部类sync的release()方法(具体实现在AbstractQueuedSynchronizer类中)。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
首先尝试给state减少传递进来的值,tryRelease()方法在Sync中实现。
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
确保执行解锁操作的线程是当前占有锁的线程,否则会直接抛出错误。
如果此时此时state已经为0,则说明已经没有线程再持有锁了。那么就可以返回true,从队列中挂起等待回复的线程选择一个调用刚才已经解释过的unparkSuccessor()恢复一个线程重新在队列中尝试获得锁。
解锁就是那么简单。
至于nonFairSync与公平锁的区别,可以在lock()方法中看出。
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}
在这里非公平锁的加锁过程中一开始就将state加一,而不是像之前公平锁中那样先获得锁在取得锁。
接下来是条件队列的分析。
外部通过调用newCondition()方法获得新的Condition对象。
final ConditionObject newCondition() {return new ConditionObject();
}
而具体的Contition实现类ConditionObject类在AbstractQueuedSynchronizer作为内部类。
private transient Node firstWaiter;private transient Node lastWaiter;
两个分别指向条件队列的头尾的Node。
在addConditionWaiter()方法中在条件节点的末尾加入节点作为尾节点。
private Node addConditionWaiter() {Node t = lastWaiter;.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
在加入之前需要判断尾节点是否是符合条件队列要求的节点,否则调用unlinkCancelledWaiters()方法将条件队列中不符合要求的节点都清除掉。
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}
在唤醒条件队列 的过程中直接调用dosignal()方法将头结点唤醒。
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
而唤醒的过程非常有意思。
final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
将节点的状态变为0,并将该节点放入锁的并发队列,如果放入之后前驱节点已经处于取消状态,或者给前驱节点设置SIGNAL状态,则直接将当前线程从挂起中恢复。
接下来可以看到Condition最为常用的await()方法。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
在调用await()方法的时候,先生成节点放入条件队列中。直到外界调用signal()方法将条件队列中的头结点放入并发队列中。否则线程一直处于挂起状态。
当处于并发队列中并且从挂起状态恢复之后,则会尝试在队列中取得锁,与并发节点的在并发队列中获取锁的操作一样,都调用acquireQueued()方法。