上一篇 AQS源码解析(1)已经介绍了AQS的数据结构。在这篇,将从J.U.C里的许多基于AQS实现的同步工具出发,看一看AQS到底提供了哪些功能。
比如Lock lock = new ReentrantLock()
,默认是非公平锁的实现,使用lock()
、unlock()
进行同步,先来看一下如何加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); }
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); } static final class FairSync extends Sync { final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) {} } static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) {} } }
|
看代码还是很清晰的,核心就在acquire()
中。这个方法是在父类AQS内定义实现的,而tryAcquire()
在AQS内只给了定义,具体实现还是在子类中。
1 2 3 4 5 6 7 8 9 10
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
|
以ReentrantLock
为例,内部类Sync
的子类FairSync
和NonFairSync
分别实现了公平锁与非公平锁。这时能清楚看到,在AQS中用来表示同步状态的state
,会随着每次acquire()
而从0
开始累加。同理每次release()
会减少直到为0
,当state == 0
可以认为资源都被释放,公平锁需要把资源让给等待更久的线程。如果当前线程已经持有资源,可以继续增加state
,无需重新等待竞争,这就是可重入的意义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
如果tryAcquire(arg)
失败,将创建独占模式节点入队,addWaiter(Node.EXCLUSIVE)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|
进行到此,再回顾一下:!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
acquireQueued()
将开始从队列中自旋获取刚加入的节点,主要做的就是三件事,一是将满足条件的节点设置为新的头节点,二是给不满足条件的节点调整到最近的一个非CANCELLED
节点后,并且将其先前节点的状态修改为SIGNAL
,三是使用LockSupport.park
将合适节点的线程休眠,最终返回结果表示当线程等待过程中是否被中断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next;
node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }
node.next = node; } }
|
在看源码时有一个问题困扰着我,为什么在shouldParkAfterFailedAcquire
时一定是从尾向头去找,而不是反过来呢?
至此,对于ReentrantLock
的lock()
的实现,还剩一步unparkSuccessor()
来唤醒后续节点,这个留在接下来要学习的unlock()
实现中来分析。