本文共 10491 字,大约阅读时间需要 34 分钟。
AQS(AbstractQueuedSynchronizer)为组成Java并发包的工具类的核心,我们可以看到它的实现类中有很多我们常用的同步器:
那么AQS是怎么定义和管理资源的?同步器是通过怎样的方式从AQS扩展?这是此文需要讨论的内容。
由于作者本人水平有限,在分析时可能会出现纰漏和错误,希望大家可以指出,让我们一起学习,一起进步。
在AQS中有一个state
字段,其注释The synchronization state
意为同步状态,它就是用于表示资源当前状态的。
不同的同步器对资源的定义不同,常用的同步器对资源的定义如下:
了解了资源状态的定义,那么接下来就需要看同步器是怎么定义、获取和修改状态。AQS通过模板方法定义了一系列方法用于子类重写,这些方法在AQS中直接调用都是抛出UnsupportedOperationException
异常的,它们分别是:
tryAcquire
:独占式获取状态tryRelease
:独占式释放状态tryAcquireShared
:共享式获取状态tryReleaseShared
:共享式释放状态isHeldExclusively
:状态是否为独占式好了,在了解了AQS对资源式如何进行管理后,我们接下来关心的问题是,对于等待线程的处理,是交给AQS来处理,还是其子类同步器来处理?它是怎么进行处理的?这就引出了在并发操作中重要的一个概念:等待队列
。
通过查看AQS的代码,我们会发现它有两个内部类:
其中Node
便是等待队列的节点,而ConditionObject
则是用于实现Condition
功能的内部类。
对于
Object.await
这一系列方法来说,如果调用某个对象的wait()方法,当前线程必须拥有这个对象的锁,所以调用wait()方法必须在同步块或者同步方法中进行。之所以是属于Object
类,是因为每个对象都可以有锁,所以是通过对象来操作,而不是线程。AQS实现的
ConditionObject
则是作用于线程,对线程进行调度的。
所以说AQS为开发同步器提供了两个功能:
而同步器只需要专注于解决:
在前面我们看到在AQS中有一个Node
类,看注释可以知道它就是等待队列的节点。虽然看起来它的注释很长,但是实际上代码量却不多,去掉注释后也就60多行。
因为本文主要是设计模式的分享(好吧,主要原因是因为我水平不够),所以就不解读代码了,而是将关注点放在它的实现原理上。
在类的开头注释就说了这个一个变体CLH
的节点,然后大体的内容是CLH用于自旋锁,它的节点保存了当前阻塞线程的信息,如果它的前驱节点释放了,就会通过修改当前节点的waitStatus
字段来通过当前节点,然后出队前驱节点,让当前节点尝试获取锁。如果有新的等待线程要入队,那么就会被加到队列的尾部,即遵循先入先出
的顺序。
对于waitStatus字段的状态有以下5种:
节点状态 | waitStatus | 描述 |
---|---|---|
SIGNAL | -1 | 表示当前节点被阻塞了(前驱结点在入队后、阻塞前,应确保将其节点类型改为SIGNAL,以便节点取消或释放时将前驱节点唤醒。) |
CANCELLED | 1 | 表示前驱超时或被中断,需要移出队列 |
CONDITION | -2 | 表示前驱在Condition队列中,阻塞等待某个条件 |
PROPAGATE | -3 | 适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。) |
OTHER | 0 | 非以上状态,节点刚创建时会处于这种状态 |
在
Node
中有prev
和next
字段,它们构成了CLH的双向链表结构,之所以采用这种结构是因为在当前节点释放时,可以将当前节点的前驱节点指向后继节点就可以了。
对如何实现等待队列有兴趣的可以看看这篇文章
ConditionObject内部使用了Node
节点作为双向链表的组成结构,我们主要关注它的await
方法和signal
方法。
public final void await() throws InterruptedException { if (Thread.interrupted()) // 线程中断处理 throw new InterruptedException(); Node node = addConditionWaiter(); // 插入条件等待节点 // 释放当前线程所持有的锁 // 里面的逻辑很简单,先是获取锁状态的值,然后直接减去 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 判断当前节点是否存在于等待队列中 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
public final void signal() { // 判断当前线程是否持有锁 // 这个方法需要同步器实现 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 释放条件等待队列队首节点 if (first != null) doSignal(first);}
然后会调用doSingal(Node first)
方法:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将这个节点与队列断开连接 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null);}
接着调用transferForSignal
方法将Condition
节点转成初始节点,并插入到等待队列。
final boolean transferForSignal(Node node) { // 将节点类型从Condition:2 --> INTIAL:0 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 插入等待队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}
对于
singalAll()
方法,其内部是调用doSignalAll()
方法,将条件等待队列中的所有节点都出队。
对于条件等待队列的使用主要用于生产者消费者模式,后文
阻塞队列
这一节讲述的就是使用AQS的ConditionObject进行操作的并发阻塞队列。
本节通过对ReentrantLock加锁、解锁的分析来了解AQS的独占锁机制。
在开始前先了解一下公平锁和非公平锁的实现,我们知道公平锁就是多个线程按照申请锁的顺序去获得锁,非公平锁是先尝试获取锁,如果失败了则加入到等待队列中。我们查看ReentrantLock中的FairSync
和NonfairSync
的话,可以看到它们的代码区别就是在公平锁里有个hasQueuedPredecessors
方法用于判断该线程是否有前驱节点,如果没有则表示它位于队首,那么可以尝试获取锁。而在非公平锁里,不管当前线程是不是队首,都会尝试获取锁。
然后我们来看看在公平锁下对于独占锁的获取和释放:
对于ReentrantLock.lock()
方法实际上是调用了AQS的acquire
方法:
// 先尝试获取锁,如果失败了就封装为等待节点入队到队列,并挂起当前线程public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
tryAcquire
是模板方法,我们来看看它在ReentrantLock
的公平锁的实现:
protected final boolean tryAcquire(int acquires) { // acquires=1 final Thread current = Thread.currentThread(); // 当前线程 int c = getState(); // 获取同步状态 if (c == 0) { // 表示锁没有被占用 if (!hasQueuedPredecessors() && // 如果是队首节点 compareAndSetState(0, acquires)) { // 尝试获取锁 // 加锁成功,将锁的独占线程设置为当前线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 独占线程是不是当前线程 int nextc = c + acquires; // 重入次数 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); // CAS设置锁状态 return true; } return false;}
对于ReentrantLock.unlock()
方法实际上是调用了AQS的release
方法:
public final boolean release(int arg) { if (tryRelease(arg)) { // 尝试释放锁:state - 1 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 解锁成功,唤醒队首节点的线程 return true; } return false;}
释放锁的代码也很简单:
protected final boolean tryRelease(int releases) { // releases=1 int c = getState() - releases; // 锁状态减1 // 如果当前线程不是独占线程则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 锁状态为0表示锁没有被占用了 if (c == 0) { free = true; // 将独占线程设为null setExclusiveOwnerThread(null); } setState(c); return free;}
在解锁成功后会调用unparkSuccessor
释放队首节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 此时 ws=-1 if (ws < 0) // 将当前状态置为0,表示后继节点即将被唤醒 node.compareAndSetWaitStatus(ws, 0); Node s = node.next; // 如果后继节点处于CANCELLED状态,即ws=1时,会从队尾开始找到第一个可被释放的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } // 唤醒线程 if (s != null) LockSupport.unpark(s.thread);}
本节通过对CountDownLatch加锁、解锁的分析来了解AQS的共享锁机制。
CountDownLatch的await()
和countDown()
都是调用内部类Sync
的方法,所以我们主要分析这个类的构造和相关方法。
CountDownLatch的await
实际上是调用AQS的acquireSharedInterruptibly
方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果等待线程中断,那么抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 获取共享锁,小于0表示获取失败 if (tryAcquireShared(arg) < 0) // 如果获取共享锁失败,那么加入等待队列 doAcquireSharedInterruptibly(arg); }
我们会发现CountDownLatch里的tryAcquireShared方法十分简单,就是判断锁的状态是否为0,为0表示条件已经完成,等待队列的线程都可以执行了:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 尝试释放锁 doReleaseShared(); // 进行释放操作 return true; } return false;}
protected boolean tryReleaseShared(int releases) { // releases = 1 // 尝试对锁进行一次释放,判断是否可以释放锁的逻辑在 // return nextc == 0 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; }}
如果锁可以释放成功,那么就会进入doReleaseShared
方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 将头节点状态标记为0,表示唤醒后继节点 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; unparkSuccessor(h); // 唤醒后继节点 } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; } if (h == head) break; }}
unparkSuccessor
前面已经分析过了,这里就不再赘述。
阻塞队列分为有界阻塞队列和无界阻塞队列。
常用的有界阻塞队列有:
Integer.MAX_VALUE
。LinkedBlockingQueue维护了两把锁——takeLock
和putLock
,同一时刻,可以同时有一个线程进行入队,另一个线程进行出队。Integer.MAX_VALUE
,底层基于双链表实现的双端队列结构。对于有界阻塞队列的实现原理,以ArrayBlockingQueue举例。
对于它的put(E e)
方法:
public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列已满,就在notFull队列上等待 while (count == items.length) notFull.await(); // 队列未满, 直接入队 enqueue(e); } finally { lock.unlock(); } }private void enqueue(E e) { final Object[] items = this.items; items[putIndex] = e; // 队列已满,则重置索引为0 if (++putIndex == items.length) putIndex = 0; count++; // 唤醒一个消费等待线程 notEmpty.signal(); }
为什么要用
while循环
判断队列是否已满?这是是多线程设计模式中的
Guarded Suspension模式
,具体的可以看看这篇博客
对于出队方法,比如说take()
:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列为空, 则线程在notEmpty条件队列等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 唤醒一个生产等待线程 notFull.signal(); return e;}
常用的无界阻塞队列有:
栈
和队列
,容量为0,数据直接在消费者和生产者之间传递。支持公平/非公平策略。transfer
方法,如果有消费者在阻塞等待数据,那么就会直接传输给消费者,然后将数据入队,然后阻塞等待,直到消费者线程进行消费。前一节我们了解到对于有界阻塞队列,是使用两个Condition
对线程进行调度的,其中一个调度生产者,一个调度消费者。那么对于无界阻塞队列就很好理解了,它其实就是只使用一个Condition
来调度消费者。
实现流程和有界阻塞队列差不多,这里就不写了。。。。。
转载地址:http://cjqgn.baihongyu.com/