前面我们了解了AbstractQueuedSynchronizer框架独占模式的流程,今天我们借助Semaphore的源码来学习下共享模式的原理。
共享模式
共享模式和独占模式的关系就好比共享锁和独占锁,共享模式在获取资源或者是释放资源时,都可能会唤醒后面所有非独占的节点,共享模式它交给子类实现的方法有:
- tryAcquireShared(int):共享方式获取锁。尝试获取资源,成功则返回剩余资源数,失败则返回当前资源数减申请资源数(负数)
- tryReleaseShared(int):共享方式释放锁。尝试释放资源,成功则返回true,不存在失败情况
这个方法获取失败后会调用doAcquireShared方法将线程包装成Node节点,加入到同步队列的队尾。而doAcquireShared这个方法和独占模式的acquireQueued方法类似,会控制队列中的线程不断去尝试获取资源(tryAcquire),获取失败 -> 线程挂起 -> 唤醒 -> 获取资源 ... 的方式不断循环尝试,直到获取到资源后跳出循环。在获取资源失败后还会将前驱节点的状态标志为 SIGNAL,不过与独占模式不同的是:如果线程节点被唤醒后,且获取资源成功,且后继节点为共享模式,那么会唤醒后继节点……唤醒会一直传递下去,直到后继节点不是共享模式,唤醒的节点同样会去获取资源,这点和独占模式不一样
共享模式画张图来理解下唤醒流程:
场景分析
同理根据一个小案例,来分析Semaphore的执行流程,测试代码如下,五个线程(线程A、线程B、线程C、线程D、线程E)执行一段代码,但是这段代码(业务)只允许同时2个线程执行,那么此时Semaphore的许可证数只为2,意思是同一时刻只允许2个线程申请资源,第三个线程申请时需要等待前面两个线程释放,后面的线程只需要等待前面的线程释放许可才能获取到许可证。了解到上下文后,我们逐行分析共享模式下,AbstractQueuedSynchronizer充当的角色和其中的执行内幕
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2, true);
Thread threadA = new Thread(getRunnable(semaphore, 1000), "threadA");
Thread threadB = new Thread(getRunnable(semaphore, 1000), "threadB");
Thread threadC = new Thread(getRunnable(semaphore, 1000), "threadC");
Thread threadD = new Thread(getRunnable(semaphore, 1000), "threadD");
Thread threadE = new Thread(getRunnable(semaphore, 1000), "threadE");
threadA.start();
threadB.start();
threadC.start();
threadD.start();
threadE.start();
}
private static Runnable getRunnable(Semaphore semaphore, int time) {
return () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired --- ");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released --- ");
}
};
}
主要分析源码流程如下(以下顺序是假想执行顺序非真实执行顺序):
- 线程A和线程B拿到资源,修改state状态
- 线程C未拿到资源,进入等待队列
- 线程A执行完成,释放资源唤醒第一个有效节点,线程C拿到资源,并且唤醒后面的线程D
- 线程D虽然被唤醒,但是没有资源会再次进入挂起,直到线程B释放资源,重复3步骤唤醒线程E
- 线程E...直到有资源时,获取资源设置头结点为E,释放资源时,不再唤醒其他线程,结束流程
源码分析
JDK版本 8u251
构造函数
public Semaphore(int permits, boolean fair) {
/**
* 第一个参数permits代表总许可数 会设置给AQS的state作为总资源数
* fair 作为第二个参数 如果为true 表示是一个公平信号量
* 否则为 非公平信号量,这里我们可以放到下面查看区别
*/
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 公平信号量
FairSync(int permits) {
super(permits);
}
// 非公平信号量
NonfairSync(int permits) {
super(permits);
}
构造函数用传入总许可证数作为了资源state数,既是可被多少个线程同时申请资源的次数,下面我们来看一下acquire方法
申请资源方法acquire
public void acquire() throws InterruptedException {
// Interruptibly代表会响应中断的
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
/**
* 提前判断线程是否中断 如果是中断 直接抛出异常 Interruptibly代表会响应中断的
*/
if (Thread.interrupted())
throw new InterruptedException();
/**
* 调用子类实现的 tryAcquireShared方法
* 如果返回值小于0 这个返回值就是剩余资源数
* 就会调用doAcquireSharedInterruptibly 进入队列
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
acquire方法调用了 acquireSharedInterruptibly 方法,Interruptibly代表会响应中断,如果当前线程在申请资源时已经是中断状态,则会直接抛出异常。否则进入申请资源逻辑:
- 调用tryAcquireShared方法申请共享资源,arg是申请资源的个数,默认为1;tryAcquireShared方法会申请现有资源,对应Semaphore中两个实现FairSync和NonfairSync,和ReentrantLock类似我们等下看它们实现的不同
- 如果tryAcquireShared返回值大于0直接返回,小于0代表获取资源失败,则会调用doAcquireSharedInterruptibly进入等待队列
申请共享资源tryAcquireShared
// FairSync 公平锁
protected int tryAcquireShared(int acquires) {
for (;;) {
/**
* 判断队列中是否含有 有效节点 如果有等待队列直接返回 -1
* 代表去doAcquireSharedInterruptibly 方法中,去等待队列中挂起
* 共享锁中 s.thread != Thread.currentThread() 头结点的thread值
* 肯定是一直为null的 所以此时只需要判断队列不为空 即可需要入队了
*/
if (hasQueuedPredecessors())
return -1;
/**
* 获取当前资源数,如果减去当前需要申请的资源
* 大于等于0则使用cas的方式更新state的值,成功后返回剩余资源数
*/
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 非公平锁
protected int tryAcquireShared(int acquires) {
// 调用父类不公平的抢占共享锁资源方法
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
/**
* 这里和公平锁不一样的地方在于 少了一个hasQueuedPredecessors() 方法判断
* 如果没有判断hasQueuedPredecessors(),当前线程过来抢占资源是不需要检查
* 等待队列中是否包含其他有效节点,而是直接抢占资源,不存在排队的概念,虽然
* 降低了线程挂起等待的概率,但也有了线程饥饿问题
*/
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
这里我们可以看出公平锁和非公平锁的区别,公平锁主要在于在获取资源前首先需要判断等待队列中是否有其他元素,如果有等待队列则不进行资源获取,而非公平锁不需要判断等待队列,可以降低线程挂起的概率,提高吞吐量,但也有线程饥饿的问题
对于获取资源的代码,都是一致的,先判断当前资源减去所申请数是否大于等于0,如果不满足说明资源不够申请,直接返回当前资源数减所需资源数之差,负数代表获取失败;如果大于等于0满足说明资源满足,则使用CAS的方式更新state的值,保证只有一个线程更新成功,同理返回剩余资源数
经过上面方法,我们的案例只会保证线程A、线程B获取资源成功,如下图:
进入等待队列方法doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
/**
* 调用addWaiter方法将当前线程包装成Node节点
* 并加入等待队列的尾部
*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
// 拿到前驱结点
final Node p = node.predecessor();
// 和独占模式类似,如果是头结点说明可以尝试获取资源
if (p == head) {
/**
* tryAcquireShared 尝试共享的方式获取资源
* 实际上是 判断总许可数 state是否大于传入的申请许可数
* 如果大于 说明还有资源可以获取,使用CAS的方式更新state的值
* 否则返回负数 表示申请失败
*/
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* setHeadAndPropagate 设置当前节点为head r是剩余的资源
* 设置当前节点到头结点
* 如果r > 0 还会继续唤醒其他挂起的线程
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/**
* 同理 shouldParkAfterFailedAcquire 这里也是在判断是否需要挂起
* 前驱节点为SIGNAL
* 第一次循环不会进入parkAndCheckInterrupt 第二次循环仍然无法获取到锁
* 则会进入parkAndCheckInterrupt方法
* 与独占模式不同的是 这里如果是中断 则会抛出InterruptedException异常
* 使用 Semaphore.acquireShared 方法可以避免异常
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/**
* parkAndCheckInterrupt 方法使用的是 LockSupport.park(this)
* park方法是响应中断的,如果检测到外部将本线程标记为中断,那么会从阻塞中直接返回
* Thread.interrupted() 为true时 此处会抛出异常InterruptedException
*/
throw new InterruptedException();
}
} finally {
if (failed)
/**
* 抢占到锁资源后,会将failed设置为false 即非失败
* 如果非失败,那么会去执行临界区资源,最后解锁删除头结点
* 如果失败了,标识这个线程被中断或者自旋时出异常了,不再需要获取排队资源,删除队列中的节点
* 某些方法 是不支持中断的 中断了会立刻响应park方法
* 并在上面手动抛出异常 此时需要是取消在队列中的状态的
*/
cancelAcquire(node);
}
}
Interruptibly后缀结尾的方法是支持响应中断的,如果在挂起(park)时中断会抛出异常并取消当前的节点等待状态;
上面的逻辑和独占锁类似,主要分为以下几步:
- 封装当前节点为Node节点,节点类型为 SHARED ,代表共享模式;addWaiter方法会初始化等待队列,并将当前节点放在队尾
- 进入死循环:首先拿到前驱节点,判断是否是head节点(head节点是一个虚节点不保存线程数据),如果是head节点代表前面没有等待的线程,可以尝试调用子类实现的tryAcquireShared获取资源,如果获取成功,会返回剩余资源数,设置当前节点为头结点,如果返回的资源数大于0说明还可以唤醒其他线程,在setHeadAndPropagate方法中设置头结点信息,并调用doReleaseShared唤醒其他线程,此线程完成资源获取结束;如果tryAcquireShared获取资源失败,那么会调用shouldParkAfterFailedAcquire方法判断当前线程是否需要进入等待,第一次会将当前节点的前驱节点修改为SIGNAL代表当前后置节点已经挂起处于等待状态,第二次循环时仍然未获取到锁会进入parkAndCheckInterrupt方法,进入挂起状态
- parkAndCheckInterrupt方法会响应中断,LockSupport.park方法在其他线程将本线程置为中断时,会立刻从中断状态返回,并且返回中断标志位,如果判断当前线程已经中断,则会抛出InterruptedException异常响应中断
经过doAcquireSharedInterruptibly此方法后,此时AQS的状态:
释放资源方法release
public void release() {
// 释放共享资源1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
/**
* 调用子类实现的tryReleaseShared方法
* 释放资源后 会调用doReleaseShared方法唤醒队列中的线程
*/
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
这里release会调用releaseShared方法,并传递的参数为1,即每次调用release方法释放一个资源;releaseShared方法会首先调用Semaphore.Sync类实现的tryReleaseShared方法,使用CAS的方式增加state资源的值,最后调用doReleaseShared唤醒其他线程
protected final boolean tryReleaseShared(int releases) {
for (;;) {
/**
* 获取当前资源数,释放资源则是增加state的值
* 这里写的比较谨慎 判断 数值溢出的场景 两个正数之和可能是负数
* 最后使用CAS的方式更新state的值,如果失败则重试
*/
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared方法可以理解为归还资源,在当前state的基础上,使用CAS的方式更改state的值为当前state + 释放资源数,更新成功后返回true
唤醒等待线程方法doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
/**
* 此处方法有两个地方会调用:
* 1. 获取资源时,判断还有资源时 会调用doReleaseShared 唤醒等待的线程
* 2. 释放资源时,唤醒队列中的线程去争取资源
*/
for (; ; ) {
Node h = head;
// 判断队列不为空的标准写法
if (h != null && h != tail) {
int ws = h.waitStatus;
/**
* 将head SIGNAL更新为0初始状态 SIGNAL标识后一个节点处于等待状态
* compareAndSetWaitStatus 这里做的事情和 unparkSuccessor第一步一样
* 是防止并发场景下 其他线程进行了唤醒 此处不会再进行二次唤醒 且head更改后进行下次唤醒
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒队列中的后置节点
unparkSuccessor(h);
/**
* 如果head为0 这里有三种情况
* 1. 刚刚初始化 head == tail
* 2. 线程刚刚入队,还没有来得及shouldParkAfterFailedAcquire 更新前驱节点为SIGNAL
* 3. 前一个线程刚刚释放锁 unparkSuccessor 中的 判断也会首先更新前驱结点
* 尝试更新head 为 PROPAGATE 状态 表示下一次共享状态会无限制传播下去
*/
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
/**
* head发生变化只可能其他线程唤醒后 拿到了资源 更改了头节点
* 如果更新了头结点那么 尝试继续唤醒
*/
if (h == head) // loop if head changed
break;
}
}
doReleaseShared代码不太好理解,这里我们简单罗列下步骤:
- doReleaseShared是一个死循环,首先判断头结点的状态是否为SIGNAL,为SIGNAL代表后续节点正在等待可以进行唤醒,使用CAS的方式将SIGNAL更新为0,调用unparkSuccessor方法进行唤醒,如果在更新为0时进行了线程切换,且其他线程也在此时释放了资源,调用doReleaseShared方法,那么此时head的状态为0进入下一个判断
- 使用CAS的方式更新0为PROPAGATE状态,代表此时应该传播状态唤醒下一个节点。(此时已经至少有两个资源需要唤醒),在上一个线程唤醒完成后,更新head的之后,如果有后置节点那么新的head的节点状态肯定为SIGNAL,即在setHeadAndPropagate方法中会再次调用doReleaseShared进行唤醒
最后来看一下比较难理解的setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置当前节点为头结点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
/**
* 这里 会判断传入的剩余资源数是否大于0 如果 propagate > 0 说明还有剩余资源
* 可以唤醒其他线程
* head 为空代表未初始化 没看懂为啥要这么判断(常规判空吧) doReleaseShared 如果head为空什么也不会做
*
* 此时的h 是当前节点的前驱节点
* h.waitStatus < 0 可能是在唤醒当前线程时,其他线程进行了资源释放 调用doReleaseShared
* 会执行compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 将head更新为PROPAGATE
* 所以 当head小于0 时需要唤醒其他线程去抢占资源
* 第二种情况是 当前线程本身就不是队尾,但是 h = head != null 那么后置节点肯定排队的时候就
* 把前驱 就是当前节点更新为了SIGNAL 那么一定是-1的 此时会进行一次不必要的唤醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// s == null || s.isShared() 则是在判断是一个有效且为共享节点
if (s == null || s.isShared())
doReleaseShared();
}
}
此处的逻辑在代码中也有阐述,首先setHeadAndPropagate在唤醒线程后,获取到锁资源时进行调用,propagate的值为剩余资源数,那么有以下几个步骤:
- 如果propagate 成立,那么说明还有多余的资源,如果next节点是共享节点,那么调用doReleaseShared继续唤醒
- 如果之前的head的waitStatus状态小于0,这里主要防止doReleaseShared方法所说,在即将唤醒当前线程时,又有其他线程调用了doReleaseShared方法释放了资源,此时不仅仅需要当前线程,需要唤醒下一个线程,此时的head.waitStatus为-3,代表传播唤醒状态
- 如果之前的head的waitStatus状态不小于0,说明已经是中间状态,那么会进入第二个判断,实际上第二个判断如果后置节点有有效节点,那么肯定是SIGNAL(-1)的,是会进行不必要的唤醒,而不排除也会存在判断时进行了线程切换,其他线程释放了资源调用了doReleaseShared方法,此时的head状态为-3的情况,所以在注释上也说了此方法可能会导致不必要的唤醒:
The conservatism in both of these checks may cause unnecessary wake-ups, but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
我们结合场景理解下这里,假设线程A此时调用了release方法,那么AQS的内部结构为:
此时在唤醒线程C之前,刚刚修改为head状态为0时,如果线程B也释放了资源,那么它会将头结点的状态修改为PROPAGATE,因为此时至少包含两个资源,PROPAGATE表示继续传播唤醒下一个线程D,如下图所示:
当然大部分情况是挨个唤醒的,上面的特殊情况是Doug Lea考虑到了并发释放资源的情况,做了兼容唤醒处理,当然也造成了不必要的唤醒
如果保持这样的唤醒节奏,会不断释放、唤醒、抢占,最后只剩下线程E时,不会继续唤醒线程了,直接返回结果
小节
共享模式下的state可以是一个正整数,在信号量(Semaphore)中表示当前剩余的许可数,每当一个线程申请一次资源,state都会递减,直到为0时再申请则会进入等待队列,而公平信号量和非公平信号量的区别就是公平信号量每次在申请资源时会判断等待队列中是否包含有效节点,如果队列中有有效节点,那么公平信号量就会直接返回获取失败,而尝试进入等待队列等待获取;在有线程释放资源时,state也会递增,并且会唤醒等待队列中的第一个节点,而且如果等待队列中有其他节点那么此时还会继续进行一次不必要的唤醒,主要是兼容在唤醒时发生并发释放资源,唤醒线程抢占到资源后,会继续唤醒后置节点,最后直到队列中没有资源唤醒,结束流程