博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AQS及其衍生物
阅读量:3933 次
发布时间:2019-05-23

本文共 10491 字,大约阅读时间需要 34 分钟。

前言

AQS(AbstractQueuedSynchronizer)为组成Java并发包的工具类的核心,我们可以看到它的实现类中有很多我们常用的同步器:1600388818769

那么AQS是怎么定义和管理资源的?同步器是通过怎样的方式从AQS扩展?这是此文需要讨论的内容。

由于作者本人水平有限,在分析时可能会出现纰漏和错误,希望大家可以指出,让我们一起学习,一起进步。

AQS资源管理

在AQS中有一个state字段,其注释The synchronization state意为同步状态,它就是用于表示资源当前状态的。

不同的同步器对资源的定义不同,常用的同步器对资源的定义如下:

  • ReentrantLock:资源表示独占锁,state为0表示锁没有被持有,为1表示锁被占用,为N表示重入次数。
  • ReentrantReadWriteLock:资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。
  • CountDownLatch:资源表示计数,state为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
  • CyclicBarrier:跟CountDownLatch表示一样,只不过在计数器归零后又重置为一开始的值,这样循环下去。
  • Semaphore:资源表示信号量或者令牌。state为0表示没有令牌可用,所有请求线程都需要阻塞;大于0表示令牌可用,请求线程可获取一个令牌,同时state减1,执行线程每释放一个令牌,state加1。

了解了资源状态的定义,那么接下来就需要看同步器是怎么定义、获取和修改状态。AQS通过模板方法定义了一系列方法用于子类重写,这些方法在AQS中直接调用都是抛出UnsupportedOperationException异常的,它们分别是:

  • tryAcquire:独占式获取状态
  • tryRelease:独占式释放状态
  • tryAcquireShared:共享式获取状态
  • tryReleaseShared:共享式释放状态
  • isHeldExclusively:状态是否为独占式

好了,在了解了AQS对资源式如何进行管理后,我们接下来关心的问题是,对于等待线程的处理,是交给AQS来处理,还是其子类同步器来处理?它是怎么进行处理的?这就引出了在并发操作中重要的一个概念:等待队列

通过查看AQS的代码,我们会发现它有两个内部类:1600398142704

其中Node便是等待队列的节点,而ConditionObject则是用于实现Condition功能的内部类。

对于Object.await这一系列方法来说,如果调用某个对象的wait()方法,当前线程必须拥有这个对象的锁,所以调用wait()方法必须在同步块或者同步方法中进行。之所以是属于Object类,是因为每个对象都可以有锁,所以是通过对象来操作,而不是线程。

AQS实现的ConditionObject则是作用于线程,对线程进行调度的。

所以说AQS为开发同步器提供了两个功能:

  1. 管理资源的机制
  2. Condition的使用

而同步器只需要专注于解决:

  1. 资源到底是什么?
  2. 阻塞队列的使用

等待队列

在前面我们看到在AQS中有一个Node类,看注释可以知道它就是等待队列的节点。虽然看起来它的注释很长,但是实际上代码量却不多,去掉注释后也就60多行。

因为本文主要是设计模式的分享(好吧,主要原因是因为我水平不够),所以就不解读代码了,而是将关注点放在它的实现原理上。

在类的开头注释就说了这个一个变体CLH的节点,然后大体的内容是CLH用于自旋锁,它的节点保存了当前阻塞线程的信息,如果它的前驱节点释放了,就会通过修改当前节点的waitStatus字段来通过当前节点,然后出队前驱节点,让当前节点尝试获取锁。如果有新的等待线程要入队,那么就会被加到队列的尾部,即遵循先入先出的顺序。

对于waitStatus字段的状态有以下5种:

节点状态 waitStatus 描述
SIGNAL -1 表示当前节点被阻塞了(前驱结点在入队后、阻塞前,应确保将其节点类型改为SIGNAL,以便节点取消或释放时将前驱节点唤醒。)
CANCELLED 1 表示前驱超时或被中断,需要移出队列
CONDITION -2 表示前驱在Condition队列中,阻塞等待某个条件
PROPAGATE -3 适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
OTHER 0 非以上状态,节点刚创建时会处于这种状态

Node中有prevnext字段,它们构成了CLH的双向链表结构,之所以采用这种结构是因为在当前节点释放时,可以将当前节点的前驱节点指向后继节点就可以了。

对如何实现等待队列有兴趣的可以看看这篇文章

ConditionObject

ConditionObject内部使用了Node节点作为双向链表的组成结构,我们主要关注它的await方法和signal方法。

await()

public final void await() throws InterruptedException {
if (Thread.interrupted()) // 线程中断处理 throw new InterruptedException(); Node node = addConditionWaiter(); // 插入条件等待节点 // 释放当前线程所持有的锁 // 里面的逻辑很简单,先是获取锁状态的值,然后直接减去 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) {
// 判断当前节点是否存在于等待队列中 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}

signal()

public final void signal() {
// 判断当前线程是否持有锁 // 这个方法需要同步器实现 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 释放条件等待队列队首节点 if (first != null) doSignal(first);}

然后会调用doSingal(Node first)方法:

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将这个节点与队列断开连接 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null);}

接着调用transferForSignal方法将Condition节点转成初始节点,并插入到等待队列。

final boolean transferForSignal(Node node) {
// 将节点类型从Condition:2 --> INTIAL:0 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 插入等待队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}

对于singalAll()方法,其内部是调用doSignalAll()方法,将条件等待队列中的所有节点都出队。

对于条件等待队列的使用主要用于生产者消费者模式,后文阻塞队列这一节讲述的就是使用AQS的ConditionObject进行操作的并发阻塞队列。

独占锁

本节通过对ReentrantLock加锁、解锁的分析来了解AQS的独占锁机制。

在开始前先了解一下公平锁和非公平锁的实现,我们知道公平锁就是多个线程按照申请锁的顺序去获得锁,非公平锁是先尝试获取锁,如果失败了则加入到等待队列中。我们查看ReentrantLock中的FairSyncNonfairSync的话,可以看到它们的代码区别就是在公平锁里有个hasQueuedPredecessors方法用于判断该线程是否有前驱节点,如果没有则表示它位于队首,那么可以尝试获取锁。而在非公平锁里,不管当前线程是不是队首,都会尝试获取锁。1600394720632

然后我们来看看在公平锁下对于独占锁的获取和释放:

获取锁

对于ReentrantLock.lock()方法实际上是调用了AQS的acquire方法:

// 先尝试获取锁,如果失败了就封装为等待节点入队到队列,并挂起当前线程public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

tryAcquire是模板方法,我们来看看它在ReentrantLock的公平锁的实现:

protected final boolean tryAcquire(int acquires) {
// acquires=1 final Thread current = Thread.currentThread(); // 当前线程 int c = getState(); // 获取同步状态 if (c == 0) {
// 表示锁没有被占用 if (!hasQueuedPredecessors() && // 如果是队首节点 compareAndSetState(0, acquires)) {
// 尝试获取锁 // 加锁成功,将锁的独占线程设置为当前线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {
// 独占线程是不是当前线程 int nextc = c + acquires; // 重入次数 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); // CAS设置锁状态 return true; } return false;}

释放锁

对于ReentrantLock.unlock()方法实际上是调用了AQS的release方法:

public final boolean release(int arg) {
if (tryRelease(arg)) {
// 尝试释放锁:state - 1 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 解锁成功,唤醒队首节点的线程 return true; } return false;}

释放锁的代码也很简单:

protected final boolean tryRelease(int releases) {
// releases=1 int c = getState() - releases; // 锁状态减1 // 如果当前线程不是独占线程则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 锁状态为0表示锁没有被占用了 if (c == 0) {
free = true; // 将独占线程设为null setExclusiveOwnerThread(null); } setState(c); return free;}

在解锁成功后会调用unparkSuccessor释放队首节点

private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 此时 ws=-1 if (ws < 0) // 将当前状态置为0,表示后继节点即将被唤醒 node.compareAndSetWaitStatus(ws, 0); Node s = node.next; // 如果后继节点处于CANCELLED状态,即ws=1时,会从队尾开始找到第一个可被释放的节点 if (s == null || s.waitStatus > 0) {
s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } // 唤醒线程 if (s != null) LockSupport.unpark(s.thread);}

共享锁

本节通过对CountDownLatch加锁、解锁的分析来了解AQS的共享锁机制。

CountDownLatch的await()countDown()都是调用内部类Sync的方法,所以我们主要分析这个类的构造和相关方法。

获取锁

CountDownLatch的await实际上是调用AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {
// 如果等待线程中断,那么抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 获取共享锁,小于0表示获取失败 if (tryAcquireShared(arg) < 0) // 如果获取共享锁失败,那么加入等待队列 doAcquireSharedInterruptibly(arg); }

我们会发现CountDownLatch里的tryAcquireShared方法十分简单,就是判断锁的状态是否为0,为0表示条件已经完成,等待队列的线程都可以执行了:

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;}

释放锁

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 尝试释放锁 doReleaseShared(); // 进行释放操作 return true; } return false;}
protected boolean tryReleaseShared(int releases) {
// releases = 1 // 尝试对锁进行一次释放,判断是否可以释放锁的逻辑在 // return nextc == 0 for (;;) {
int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; }}

如果锁可以释放成功,那么就会进入doReleaseShared方法:

private void doReleaseShared() {
for (;;) {
Node h = head; if (h != null && h != tail) {
int ws = h.waitStatus; if (ws == Node.SIGNAL) {
// 将头节点状态标记为0,表示唤醒后继节点 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; unparkSuccessor(h); // 唤醒后继节点 } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; } if (h == head) break; }}

unparkSuccessor前面已经分析过了,这里就不再赘述。

阻塞队列

阻塞队列分为有界阻塞队列和无界阻塞队列。

  • 有界阻塞队列:生产是有限的,当队列存储的元素个数等于上限时,生产操作就会阻塞,当队列存储的元素个数等于0时,消费操作就会阻塞。
  • 无界阻塞队列:生产是无限的。

有界阻塞队列

常用的有界阻塞队列有:

  1. ArrayBlockingQueue:容量在创建时指定,不可动态改变。
  2. LinkedBlockingQueue:它既可以在初始构造时就指定队列的容量,也可以不指定,默认为Integer.MAX_VALUE。LinkedBlockingQueue维护了两把锁——takeLockputLock,同一时刻,可以同时有一个线程进行入队,另一个线程进行出队。
  3. LinkedBlockingDeque:容量默认为Integer.MAX_VALUE,底层基于双链表实现的双端队列结构。

对于有界阻塞队列的实现原理,以ArrayBlockingQueue举例。

对于它的put(E e)方法:

public void put(E e) throws InterruptedException {
Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
// 如果队列已满,就在notFull队列上等待 while (count == items.length) notFull.await(); // 队列未满, 直接入队 enqueue(e); } finally {
lock.unlock(); } }private void enqueue(E e) {
final Object[] items = this.items; items[putIndex] = e; // 队列已满,则重置索引为0 if (++putIndex == items.length) putIndex = 0; count++; // 唤醒一个消费等待线程 notEmpty.signal(); }

为什么要用while循环判断队列是否已满?

这是是多线程设计模式中的Guarded Suspension模式,具体的可以看看这篇博客

对于出队方法,比如说take()

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
// 队列为空, 则线程在notEmpty条件队列等待 while (count == 0) notEmpty.await(); return dequeue(); } finally {
lock.unlock(); } }private E dequeue() {
final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 唤醒一个生产等待线程 notFull.signal(); return e;}

无界阻塞队列

常用的无界阻塞队列有:

  1. PriorityBlockingQueue:一种优先级队列,以权重大小入队,底层是基于数据实现的堆结构。
  2. SynchronousQueue:它底层实现了队列,容量为0,数据直接在消费者和生产者之间传递。支持公平/非公平策略。
  3. DelayQueue:底层基于PriorityBlockingQueue实现,这是一个定时无界阻塞队列。
  4. LinkedTransferQueue:当生产者调用它的transfer方法,如果有消费者在阻塞等待数据,那么就会直接传输给消费者,然后将数据入队,然后阻塞等待,直到消费者线程进行消费。

前一节我们了解到对于有界阻塞队列,是使用两个Condition对线程进行调度的,其中一个调度生产者,一个调度消费者。那么对于无界阻塞队列就很好理解了,它其实就是只使用一个Condition来调度消费者。

实现流程和有界阻塞队列差不多,这里就不写了。。。。。

转载地址:http://cjqgn.baihongyu.com/

你可能感兴趣的文章
laplacian matrix
查看>>
cotangent matrix or laplacian mesh operator
查看>>
Minimizing quadratic energies with constant constraints
查看>>
Python-第三方库requests详解
查看>>
暴力破解黄巴登录网站
查看>>
python多线程
查看>>
read selection
查看>>
optimization on macOS
查看>>
Template-Based 3D Model Fitting Using Dual-Domain Relaxation
查看>>
install libfreenect2 on ubuntu 16.04
查看>>
how to use automake to build files
查看>>
using matlab drawing line graph for latex
查看>>
How package finding works
查看>>
build opencv3.3.0 with VTK8.0, CUDA9.0 on ubuntu9.0
查看>>
how to compile kinfu_remake with cuda 9.0 opencv2.4.13.4
查看>>
qtcreator4.4.1中cmake 与cmake3.5.1本身generate出来的setting是有区别的解决方法
查看>>
CMake Useful Variables/Logging Useful Variables
查看>>
使用cmake建立工程链接OPENNI2
查看>>
ubuntu下解决csdn网页打不开的问题
查看>>
uninstall software on ubuntu
查看>>