AQS 是什么
AbstractQueuedSynchronizer(简称AQS)是一个多线程访问共享资源的抽象同步器框架,说简单点就是主要由 一个状态(state) + 双向无锁队列(CLH) 组成的工具,它主要提供了同步状态原子性管理、线程的阻塞和解除阻塞、队列的管理的方法,是Java并发包的基本工具类。
AQS是对线程管理工具的抽象,是一个锁工具的抽象,且是无法实例化的,面向锁的实现者,而非面向锁的使用者,我们熟知的ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore都是借助AQS来实现的,它交给子类实现的方法有:
- tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false
- tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false
- getState():获取锁的标志state值
- setState():设置锁的标志state值
当然上面只是一部分,标志位和队列的头尾都是volatile修饰的,意思是如果使用cas的方式更新 state 时可以作为轻量级锁,这里使用最明显的效果的就属ReentrantLock中FairSync和NonfairSync了,state = 0 表示初始化,如果线程A进来需要加锁,那么它只需要cas的方式更新state值为1,如果更新成功,代表拿到了锁(资源),其他线程再次获取时就会cas失败,此时就会按顺序放入CLH队列中挂起,等到线程A释放资源后唤醒。这里面,将线程放入队列及唤醒动作AQS已经帮我们实现好了,由子类实现不同处的方式也正是模板方法模式。
JDK版本 8u251
AQS的属性
常规套路,先来看一下JDK源码中的它是怎么实现的
成员属性
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
* 这两个Node 节点分别指向了 双向链表队列的首尾节点
* Head节点是一个虚节点 不存储线程数据的 它的next指向第一个节点
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
* tail 节点指向队列中最后一个线程资源(Node)
*/
private transient volatile Node tail;
/**
* The synchronization state.
* 这个代表资源抢占的状态,初始化为0代表没有线程抢占
* state > 0 代表有线程抢占 资源
* ReentrantLock 中 state代表重入锁的次数
* Semaphore 中 state 代表资源获取的总数,state <= 0时就需要等待
* CountDownLatch 中 state 则是await调用者需要等待释放的资源数
*/
private volatile int state;
在其父类,还有一个比较重要的属性
/**
* The current owner of exclusive mode synchronization.
* 独占锁的线程标志 如果是独占模式且当前线程获取到锁时会设置此变量
* 比如ReentrantLock中重入锁的判断标志就是此处判断获取锁的线程是否属于同一个
*/
private transient Thread exclusiveOwnerThread;
独占和共享模式
AQS基本的功能就是提供了一个同步队列,维护着阻塞状态的线程队列,并且提供了两种方式去获取资源:独占模式和共享模式,一种锁只会存在于一种模式,此处先演示的是独占模式,下面我们借助ReentrantLock的流程来分析下上面几个属性在“锁”工具中的用处。
独占模式即获取资源的排他锁,共享模式及获取资源的共享锁。
独占模式流程分析
- 抢占资源(锁)的线程首先进来执行的逻辑是锁子类重写的 tryAcquire 方法,如果获取成功,那么会将状态state值修改为1,并且将独占线程变量设置为自身,执行业务逻辑
- 当其他线程进来执行 tryAcquire 方法时,由于state已经为1,必定会cas失败,则会调用 addWaiter 方法将线程封装成Node节点,并加入CLH队列的队尾(第一次加入队列会首先初始化)
- 加入队尾后会调用 acquireQueued 方法,这个方法是一个死循环:会让线程不断去尝试获取资源(tryAcquire),获取失败 -> 线程挂起 -> 唤醒 -> 获取资源 ... 的方式不断循环尝试,直到获取到资源后跳出循环。在获取资源失败后还会将前驱节点的状态标志为 SIGNAL
节点和节点之间在自旋过程中除了前驱节点会唤醒该节点之外基本不会互相通讯
场景分析
下面借助ReentrantLock的流程分析执行流程,测试代码如下:有三个线程(线程A、线程B、线程C)同时来加锁、休眠、释放锁
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock(true);
Thread threadA = new Thread(getRunnable(lock, 100000),"threadA");
Thread threadB = new Thread(getRunnable(lock, 100000),"threadB");
Thread threadC = new Thread(getRunnable(lock, 10000),"threadC");
threadA.start();
threadB.start();
threadC.start();
}
private static Runnable getRunnable(ReentrantLock lock, int time) {
return () -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " locked --- ");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
};
}
主要分析源码流程如下:
- 线程A拿到锁,修改state状态,更改独占线程为线程A
- 线程B和线程C未拿到锁,进入等待队列
- 线程A执行完成,公平锁执行流程,线程B拿到锁,线程B节点移除
- 线程B执行完成,线程C拿到锁,队列head节点清除
获取资源-lock
线程A拿到锁,修改state状态,更改独占线程为线程A,此时AQS的内部结构如下
结合源码分析:
// FairSync
final void lock() {
// acquire 是AQS中的方法 代表申请1个资源
acquire(1);
}
// NonfairSync
final void lock() {
/**
* 非公平锁不会判断是否需要排队 而是直接上来抢占资源
* 如果抢占失败 则会调用acquire方法申请资源
*/
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
/**
* tryAcquire 尝试 获取资源
* ReentrantLock 中 FairSync 首先会判断队列中是否包含其他有效节点,
* 如果不包含则直接使用cas的方式将state更新为1,设置exclusiveOwnerThread 为当前线程
* 如果是重入 则更新state为 +1
* ReentrantLock 中 NonfairSync 则不会判断等待队列是否包含其他有效节点
* 抢占逻辑和FairSync 类似都是更新state 为 1 并设置exclusiveOwnerThread 为当前线程
* 如果是重入 则更新state为 +1
*/
if (!tryAcquire(arg) &&
/**
* 尝试获取成功 直接返回,失败后会加入等待队列
* addWaiter 是将当前线程创建Node节点加入等待队列的方法,并返回节点
* acquireQueued 方法 则是把放入队列中的线程不断去获取锁 直到获取成功或者不再需要获取(中断)
*/
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Sync是ReentrantLock的内部类,继承自AbstractQueuedSynchronizer,也是FairSync和NonfairSync的父类,抽象了lock方法,最终线程调用lock方法还是FairSync和NonfairSync实现的,上面也可以看到公平锁和非公平锁实现的差异,NonfairSync非公平锁会直接调用cas的方式修改state的值,抢占锁资源,抢占失败后进入acquire方法
acquire方法是AbstractQueuedSynchronizer中的方法,首先会调用子类实现的tryAcquire尝试获取资源,如果获取资源失败会调用addWaiter方法包装线程为Node节点,并且节点的模式为独占模式,最后调用acquireQueued管理线程的自旋获取锁资源、挂起、唤醒等
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/**
* hasQueuedPredecessors 此方法判断等待队列中是否包含有效节点,
* 返回true则代表队列中包含其他有效节点 需要加入等待队列
* 公平锁与非公平锁 tryAcquire 方法基本相同,只有此处 非公平锁不会判断
* 等待队列中是否包含有效节点而是直接抢占资源
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
插一个题外话:这里涉及公平锁和非公平锁,其实ReentrantLock公平锁和非公平锁的区别就在lock方法上,在上述代码中注释也有说,非公平锁在tryAcquire获取锁方法时不会判断是否队列中是否有其他线程等待,而是直接使用CAS的方式获取资源,这样省下了唤醒其他线程的操作时间,自然整体的吞吐量也是提高的,但是也会造成某些线程会一直饥饿
这里是否选择公平锁还是非公平锁自然要看使用者,针对的业务场景是否支持线程饥饿了,在保证业务流程正常的前提下尽可能的优化更多性能
tryAcquire方法会直接操作state的值,修改成功后会修改state的值,并且修改独占线程为当前线程,如果是重入模式,则会将state的值自增
独占模式只有一个线程tryAcquire会返回成功,那么此时线程B和线程C会调用addWaiter方法进入等待队列,源码如下:
private Node addWaiter(Node mode) {
// 根据当前线程创建节点 node 是类型 SHARED 和 EXCLUSIVE 两种
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
/**
* 这里 如果已经初始化 且没有其他线程竞争时,这里会进行一次快速尝试
* addWaiter 实际上是往尾部插入新的节点
* tail 是 volatile 修饰的,这里的细节:在操作前先读取成局部变量
* 操作全局变量时均需要 使用cas锁,保证并发场景下 操作原子性
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 设置尾节点为最新节点
if (compareAndSetTail(pred, node)) {
// 走到这里只能同时只有一个线程 那么 直接更新后返回
pred.next = node;
return node;
}
}
// 尝试失败 可能是并发场景 也可能是未初始化
enq(node);
return node;
}
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
// 如果tail为空 则执行初始化
if (t == null) { // Must initialize
// 双向链表队列头节点是一个虚节点,不存储数据 head为空代表未初始化
// 第一个节点 设置头结点 为一个虚节点 代表初始化
if (compareAndSetHead(new Node()))
// tail 指向最后一个节点 head == tail 代表空队列
tail = head;
} else {
// 新节点的前驱节点为旧的tail节点
node.prev = t;
// 操作 volatile 修饰的 tail 变量 使用cas方式 更新 为新节点
if (compareAndSetTail(t, node)) {
// 前驱节点建立连接关系 此处只有一个线程会进来
t.next = node;
return t;
}
}
}
}
上面有两个方法addWaiter和enq,主要逻辑如下:
- 包装当前线程为Node节点,mode主要有两种SHARED 和 EXCLUSIVE代表独占和共享,这里是独占模式
- 判断tail是否为空,这里是一个快速尝试,如果已经进行了初始化或者队列中有其他节点,那么tail肯定是不为空的,队列插入节点的逻辑实际上就是在尾部新增一个节点,操作全局变量tail节点是需要保证原子性的,这里使用CAS的方式更新尾结点,如果更新成功那么还需要维护上一个节点的next指针,当然cas的操作只允许一个线程成功,这里也是可以保证原子性的
- CAS竞争失败或者未初始化会进入enq入队方法,这里是一个死循环,首先第一个判断t == null则是判断是否初始化,如果未初始化则会构造一个虚节点设置在Head上,head == tail代表这里是一个空队列。入队的逻辑和2中一致,CAS方式修改tail尾结点的指针为新节点,入队成功后并且返回新节点
此时AQS的内存结构如下图:
等待队列是一个双向链表,并且Node中有一个waitStatus的属性,0代表初始化,SIGNAL是待唤醒状态,即挂起时标识。加入队列成功后进入acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
// 拿到当前节点的前驱节点
final Node p = node.predecessor();
/**
* 如果是前驱节点是头结点 那么说明自己是第一个有效节点 可以尝试获取锁资源
* tryAcquire 获取锁资源
* 如果获取成功,那么 则会将当前节点设置为头结点 设置 队列为空
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* shouldParkAfterFailedAcquire 判断当前节点是否需要阻塞
* 当前不为头结点 且没有抢到资源时,可能有其他线程抢到了资源,此时需要先判断是否需要阻塞
* 被阻塞条件:前驱节点的 waitStatus为 -1 等待锁
* waitStatus 有五种状态 :
* 0 初始化
* 1 CANCELLED 线程获取锁的请求已经取消
* -2 CONDITION 节点在等待队列中,节点线程等待唤醒
* -3 PROPAGATE 当前线程处在SHARED情况下,该字段才会使用
* -1 SIGNAL 线程准备好了,等待其他线程释放资源
* 防止无限循环浪费资源
*
* 每一个线程进来后,都会设置自己的前一个有效 线程为 SIGNAL状态,第二次循环后尝试失败后,
* 则将自己park 休眠 结束后
*/
if (shouldParkAfterFailedAcquire(p, node) &&
/**
* shouldParkAfterFailedAcquire 为 true 表示 它是需要阻塞的,调用LockSupport.park 阻塞当前线程
* 并且 如果阻塞完成会返回线程是否中断标志位 interrupted
*/
parkAndCheckInterrupt())
// interrupted 表示线程是否中断过
interrupted = true;
}
} finally {
if (failed)
/**
* 抢占到锁资源后,会将failed设置为false 即非失败
* 如果非失败,那么会去执行临界区资源,最后解锁删除头结点
* 如果失败了,标识这个线程被中断或者自旋时出异常了,不再需要获取排队资源,删除队列中的节点
*/
cancelAcquire(node);
}
}
acquireQueued 的逻辑较为复杂,这里简单分析这么几步:
- 定义了两个变量,是否需要取消标志failed,是否被中断标志interrupted。(线程的等待状态是通过调用LockSupport.lock()方法实现的,这个方法会响应Thread.interrupt,但是不会抛出InterruptedException异常,这点与Thread.sleep、Thread.wait不一样)
- 首先判断是否前驱节点是否是头结点,如果是头结点说明此线程可以尝试获取锁资源,如果获取成功,则设置当前节点为头结点,移除旧头结点的信息,因为是获取资源成功退出循环,所以failed为false代表未失败,不需要取消队列中的节点。
- 非首节点或者抢占失败,那么会调用shouldParkAfterFailedAcquire方法设置前驱有效节点为SIGNAL状态,并且移除取消的无效节点,避免无限循环,最后设置成功后会调用parkAndCheckInterrupt方法挂起当前线程,直到前驱节点释放资源继续自旋。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// SIGNAL 说明 是待唤醒状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
/**
* ws > 0 只有取消状态 这一段代码实际上是在删除所有取消节点
*/
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 设置前驱节点为SIGNAL 在park前设置前驱节点为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire方法主要作用是判断当前节点的前驱节点是否包含有效节点,如果包含则会使用CAS的方式更新前驱节点的状态为SIGNAL,如果判断前驱节点是有效节点并且是SIGNAL状态那么会直接返回true,进入park方法
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt方法则是直接调用LockSupport的park方法阻塞线程,如果线程中途被中断,那么此线程会立即返回,且Thread.interrupted()的返回值为是否中断标志,线程中断机制可以看 这里 直达
释放资源-unlock
释放资源首先会调用unlock方法,公平和非公平锁都对应的release方法
public final boolean release(int arg) {
/**
* tryRelease 释放独占资源 如果不是占用资源线程调用会抛出异常 由子类实现
* unparkSuccessor 则是唤醒休眠线程
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放资源release方法,主要流程有
- 线程调用子类重写的tryRelease方法进行释放资源,如果释放成功则继续检查线程(节点)的是否有后继节点,有后继节点则去唤醒
- 调用unparkSuccessor方法进行后继节点的唤醒
protected final boolean tryRelease(int releases) {
/**
* state的值是获取资源的次数 减去需要释放的次数为0说明当前线程不再持有锁
* 释放资源之前会判断是否是当前持有资源线程 如果非持有资源线程释放则会抛出异常
*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 因为独占模式 占有线程只会有一个 所以可以直接更新
setState(c);
return free;
}
tryRelease方法是ReentrantLock.Sync自己重写的,释放独占模式的资源,首先需要判断是否是当前持有资源线程,如果非持有资源线程释放则会抛出异常。释放资源时可能是重入锁释放,所以这里需要判断当前线程是否释放全部资源,如果是则会设置独占线程为空,代表其他线程可以抢占资源。
如果tryRelease方法返回true代表其他线程可以抢占资源,那么会进入release的判断:如果队列不为空,那么需要唤醒等待队列中的线程,执行unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
/**
* 这里将head的状态设置为初始化状态 因为在唤醒其他节点后 当前head是直接回收的
* 假设下面全是取消节点,那么将不会唤醒任何节点 而head仍然是一个初始化队列节点
*/
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
/**
* 这里s是队列第一个节点 循环主要为了找到第一个非取消节点
* 当s的waitStatus = 1 代表取消,小于0则说明可以进行唤醒
* 遍历是从tail到首节点倒序遍历的 这里没有移除取消的节点
* 取消节点在acquireQueued方法中移除
* 此时从尾节点开始向前找起, 直到找到距离head节点最近的ws<=0的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果一个有效节点,那么唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}
上面的逻辑,流程大概如下:如果后继节点不为取消状态则直接唤醒,如果后继节点为取消状态,则从队列的队尾往前遍历,找到一个离节点最近且不为取消状态的节点进行唤醒
这里有一个小细节,寻找head后面的第一个有效节点是从tail开始遍历的,这里可以回看一下addWaiter方法,添加节点不是原子操作,而设置前驱节点是在CAS方式设置tail节点之前的,也就是说:在并发场景下,可能Node的next指针还未设置,但是tail的节点已经更新了,此时需要使用倒序遍历的方式可以更精确的寻找刚刚加入的Node(它还未给前驱节点设置next的节点)
上面的流程可以画张图清晰了解下:
接下来,回到acquireQueued方法,因为ThreadB一直在acquireQueued方法中挂起,因为是自然唤醒,那么它的中断标志位interrupted为false,此时,拿到当前节点的前驱节点就为head了,那么它会进入tryAcquire方法申请资源,如果CAS成功,将state的值更新为1,且独占线程设置为ThreadB之后,就会将自身设置为头结点,删除之前头结点ThreadA,再次获取锁完成
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
// 拿到当前节点的前置节点
final Node p = node.predecessor();
/**
* 如果是前置节点是头结点 那么说明只有当前一个有效节点 可以尝试获取锁资源
* tryAcquire 获取锁资源
* 如果获取成功,那么 则会将当前节点设置为头结点 设置 队列为空
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* shouldParkAfterFailedAcquire 判断当前节点是否需要阻塞
* 当前不为头结点 且没有抢到资源时,可能有其他线程抢到了资源,此时需要先判断是否需要阻塞
* 被阻塞条件:前驱节点的 waitStatus为 -1 等待锁
* waitStatus 有五种状态 :
* 0 初始化
* 1 CANCELLED 线程获取锁的请求已经取消
* -2 CONDITION 节点在等待队列中,节点线程等待唤醒
* -3 PROPAGATE 当前线程处在SHARED情况下,该字段才会使用
* -1 SIGNAL 线程已经准备好了,就等资源释放了
* 防止无限循环浪费资源
*
* 每一个线程进来后,都会设置自己的前一个有效 线程为 SIGNAL状态,第二次循环后尝试失败后,
* 则将自己park 休眠 结束后
*/
if (shouldParkAfterFailedAcquire(p, node) &&
/**
* shouldParkAfterFailedAcquire 为 true 表示 它是需要阻塞的,调用LockSupport.park 阻塞当前线程
* 并且 如果阻塞完成会返回线程是否中断标志位 interrupted
*/
parkAndCheckInterrupt())
// interrupted 表示线程是否中断过
interrupted = true;
}
} finally {
if (failed)
/**
* 抢占到锁资源后,会将failed设置为false 即非失败
* 如果非失败,那么会去执行临界区资源,最后解锁删除头结点
* 如果失败了,标识这个线程被中断或者自旋时出异常了,不再需要获取排队资源,删除队列中的节点
*/
cancelAcquire(node);
}
}
线程B获取锁的流程可以画张图:
假如线程B执行完成后释放资源,唤醒线程C时,线程C抢占资源的逻辑类似,此处就不再赘述。但是如果线程C执行完成后,释放资源时,会判断队列为空,此时亦不会操作队列唤醒线程了,直接返回。
小节
独占模式下state作为锁的重入次数,队列则是用来对获取锁资源的线程进行一个排队,挂起,防止无限制的休眠下去,包括中途还支持中断、取消等机制,说明AbstractQueuedSynchronizer是非常灵活的,至此,跟着ReentrantLock的源码已经把AbstractQueuedSynchronizer独占模式流程分析完成,也可以简单看出,AQS提供的方法是比较完善的,包括唤醒和入队等,是我们都不需要关心的逻辑,但是了解其中的流程和思路可以更好的运用AQS包。一个 状态 state + 一个队列 完成了锁的抽象,是很值得学习的,下面我们会继续分析文中提到的共享模式及未提及的等待队列
参考
JDK 源码
一文带你快速掌握AQS
【深入AQS原理】我画了35张图就是为了让你深入 AQS
Java AQS unparkSuccessor 方法中for循环为什么是从tail开始而不是head