JDK版本 8u251
前几日遇到的一个问题:一个SpringBoot项目凭空“起不来”了,日志刚刚输出完一个加载数据库驱动的配置,检查数据库用DataGrip能正常连接。很纳闷,偏偏这个时候就自己的环境起不起来其他人都可以正常启动(后来定位到和每个机器的环境变量参数数量有关)。
那咱们只能撸堆栈看看线程在干什么了,先定位到主线程在RUNNABLE,说明没有死锁,并且不是WAITING,说明没有等待数据库连接等。
咦,一看是ConcurrentHashMap的transfer方法,以前听说过HashMap多线程死循环,但一想咱们这个是单线程而且是ConcurrentHashMap八竿子打不着边。只能一行行撸源码研究技术了,看看咱们到底哪里用错了,顺便深入学习下Lock Free的内幕。
问题复现
下面就用几行代码简单复现当时的项目场景:
public class SpringContext {
// 此处方便模拟扩容 设置容量参数为32
private static final int INITIAL_CAPACITY = 32;
private static final String PRO_ENV = "pro";
private final ConcurrentMap<Object, Object> store = new ConcurrentHashMap<>(INITIAL_CAPACITY);
// 模拟 Environment 环境配置本地缓存
private final ConcurrentMapCache cache = new ConcurrentMapCache("cache", store, true);
// 扩容时容量阈值
private static final int RESIZE_CAPACITY = INITIAL_CAPACITY + (INITIAL_CAPACITY >>> 1);
// 启动时 假设环境参数有这些 RESIZE_CAPACITY - 1 个
{
for (int i = 0; i < RESIZE_CAPACITY - 1; i++) {
cache.put("cacheConfig " + i, "cacheValue");
}
}
public static void main(String[] args) {
new SpringContext().start();
}
/**
* 容器启动方法
*/
public void start() {
System.out.println("============ start ============");
String mainConfig = getConfigFromEnv("mainConfig");
System.out.println("start up at mainConfig: " + mainConfig);
System.out.println("============= end =============");
}
/**
* 优先从缓存取 取不到从其他配置服务获取
*/
private String getConfigFromEnv(String configKey) {
return cache.get(configKey, () -> getConfigByOtherConfig(configKey));
}
/**
* 模拟从其他配置服务拉取配置场景
*/
private String getConfigByOtherConfig(String configKey) {
String active = System.getenv(configKey);
/**
* 如果是生产 对加密的部分进行解密
*/
if (PRO_ENV.equals(getSpringActive())) {
System.out.println("decrypt config ...");
return active;
}
return active;
}
/**
* 优先从缓存取 取不到给dev
*/
private String getSpringActive() {
return cache.get("active", () -> "dev");
}
}
用几句话来简单解释下这段代码在干什么:
- 声明了2个成员变量:
- 本地缓存store,数据结构类型为ConcurrentHashMap,初始容量为32
- 本地缓存ConcurrentMapCache,封装了上面的ConcurrentHashMap,设置了一个缓存名字cache,基本没做啥事。
- 给SpringContext增加了一个构造代码块,这块代码在实例化对象时会运行,这里模拟容器初始化启动给本地缓存 (扩容阈值-1) 个配置。(后面会讲为什么要 RESIZE_CAPACITY -1 个)
- 程序入口,启动容器,调用start方法:
- 记录启动日志
- 从环境中获取核心配置mainConfig
- 使用配置,然后容器启动完成
常规操作,一路看下来,似乎没啥问题,逻辑也说得通,取不到本地缓存的配置从配置服务查,取到放进缓存。接下来我们启动看看?
不出意外的卡住了...堆栈显示就是卡在ConcurrentMapCache缓存那里了,业务逻辑完全没问题,难不成JDK也有BUG?(想快速查看原因,滑到最下直接查看结论即可。)
源码解析
ConcurrentHashMap数据结构
图片来源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/
哈希表(Hash Table)又叫散列表,是一个方便快速查询的数据结构,上面是一个存储Node节点的数组,Node节点存储的可以是链表也可以是红黑树。查询时需要根据每个键的哈希值,对数组的长度取余,定位到是哪一个Node节点位置(也叫桶节点),如果Node里面存在与查询的键相同的键,那么就是要寻找的值,否则为找不到。 这样的好处是,理想情况下查询是一个O(n)的复杂度,因为哈希冲突(相同的桶节点),我们需要引入Node下面的节点,这些节点存在就是为了在哈希冲突时仍有不错的查询效率,当然,链表在过长的情况下,效率较低,所以在HashMap的Node节点元素数量大于等于8个时会转化为红黑树,但是维护红黑树的自平衡也需要成本,所以在扩容后判断桶节点数小于等于6时又会再次退化为链表。
HashMap虽然高效,但是很遗憾他不是线程安全的数据结构,所以面对高并发场景Doug Lea大神为我们提供了ConcurrentHashMap这个工具类,一个并发版本的HashMap。数据结构和HashMap是一致的,只是在各种方法上支持了并发调用。
细心的你一定发现了,本地缓存初始化时,指定了32个初始容量,从HashMap的扩容规则可知加载到0.75个容量时会触发扩容,ConcurrentMapCache也是套用这个规则,我们在构造代码块设置了 32 + 16 - 1 = 47个元素,嗯?没搞错...32容量的扩容阈值48?
那咱们来打开构造方法看看:
ConcurrentHashMap构造方法
public ConcurrentHashMap(int initialCapacity) {
// 如果小于0 抛出异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
// MAXIMUM_CAPACITY 就是Integer的最大正整数,如果过大 设置 MAXIMUM_CAPACITY
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
/* 这里有点意思,tableSizeFor 求大于initialCapacity的最小2次幂,
那么initialCapacity* 1.5+1后的最小2次幂(+1针对乘1.5会有小数点,
所以+1把小数点给抹平),这和HashMap中的实现有所不同(HashMap中是
tableSizeFor(initialCapacity))
*/
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 这个值是用来表示ConcurrentHashMap数组的状态的,几种情况后面逐渐讲解
// 这里的值为数组的容量
this.sizeCtl = cap;
}
从这个构造函数设置cap值处发现端倪:构造函数想表达的initialCapacity参数,实际上不是容量,而是扩容阈值,HashMap的加载因子是0.75,实际上容量就是 扩容阈值 * 1.33333,和1.5也差不多,毕竟0.75是除法,而1.5是位运算,效率上也高好多。回到刚刚的问题,传入来是32,那么32*1.5+1=49,然后对49求最小2次幂不就64了?想想假如是22,本身不需要扩容,但经过计算22 * 1.5 + 1 = 34 ,最终会使用64,相当于扩容了一倍,而 22 / 0.75 + 1 = 30, 实际上还是应该取32,严格点,这就是一个BUG,并且有人已经提过了 JDK-8202422
从上面可以抓取到几个信息:这个bug从Java 8开始就已经有了,已经在Java 11.0.1中修复了,下面是ConcurrentHashMap的新构造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
Java11中这里被修复成了(long)initialCapacity / loadFactor) 根据加载因子来做除法,最后咱们来分析下为什么用1.5倍呢?实际上,我们再细看下ConcurrentHashMap的构造方法代码,除了全参构造中使用了计算以外,连变量都没有保存,并且在扩容时,扩容阈值的计算使用了sizeCtl = (n << 1) - (n >>> 1) (在后面会看到),相当于0.75倍新容量,也就是说,Doug Lea老爷子不希望我们自己去设置这个加载因子的值,淡化加载因子的概念,就算是我们需要设置,也只建议设置0.75了。
接着我们来看下sizeCtl这个参数,这个参数主要标识着哈希表的状态,从构造方法看出,设置了容量的大小,这个参数设计的很精妙,在后面我们再统一解析。
可以看出ConcurrentHashMap是懒加载(lazy-load 形式)的,因为在new ConcurrentHashMap时只做了一些容量大小的赋值等,很大程度上减小了初始化的开销。
咱们回到主题,现在传入的是32,咱们实际上容量是64,所以扩容的阈值就是48了,理论上我们在初始化时设置了47个,如果再加载一个元素,那么就会触发扩容,咱们带着疑问顺道来学习下put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
put方法将调用转发给putVal方法:
新增方法-putVal
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算Hash值 实际上使用的Key的hashCode方法
int hash = spread(key.hashCode());
// binCount表示添加前节点对应的桶上面的节点数
int binCount = 0;
// 轮训 如果第一次是初始化,第二次会开始添加节点值 第N次就是多线程竞争了,
// 在后面添加同一个桶的方法时 会不断尝试
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 这里判断 如果tab为空 实际上就是懒加载场景,延迟初始化 然后下次循环 此处不为空
tab = initTable();
// tabAt 方法 是Unsafe类中通过volatile方式获得指定地址所对应的值,
// 这里的偏移量是 通过 ( 数组长度 - 1 ) & hash值快速计算得到,这里的
// 原理就:对一个2的n次幂的值 快速求模 = (n - 1)与hash值,这样可以快
// 速定位到桶的位置,和HashMap一致
// 如果为null 说明还没有初始化过,则直接创建一个新的节点
// 并且将该桶的第一个节点赋值给f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用cas的方式为这个位置赋值,如果为null说明还没有其他线程更新过
// 如果不为null 就会进入下次循环判断了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果桶第一个节点Hash值为ForwardingNode节点,ForwardingNode节点的Hash
// 值为MOVE -1,意思是当前正在迁移,如果是这样的情况则这个线程也去帮助扩容
// 扩容完成后会更新tab,跳出本次循环下次插入
else if ((fh = f.hash) == MOVED)
// 这里帮助扩容的方法放到后面解析
tab = helpTransfer(tab, f);
else {
// 如果hash冲突,则尝试遍历数插入
// 暂时折叠起来 后面详细看
}
// 流程到这里 就代表put成功,map的数量加1
// 在这里同样也有协助扩容的逻辑
addCount(1L, binCount);
return null;
}
整个putVal的逻辑还是比较清晰的,我们再画张图理一下
小结下putVal方法的流程:
- 验证key和value都不能为空,如果为空抛出异常
- 使用spread方法计算哈希值
- 如果是第一次put元素,那么会进行初始化(构造函数懒加载延迟初始化)
- 如果计算出当前桶还没有初始化过,会直接用当前key和value创建新的头节点
- 如果判断出当前桶的Hash值为MOVE,那么说明已经有其他线程已经在扩容,那么此线程也会去尝试帮助扩容
- 上面的条件都不满足时,说明是Hash冲突,需要在当前桶位置插入新的节点(未完,在put节点方法继续)
当然还牵扯出很多知识点:
哈希方法spread
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
这里拿到hash值后,(因为int是32位)这里就是将hash值得高16位和低16位进行异或计算,细细分析下为什么要异或计算呢?因为hash值在后面会与哈希表数组的长度的-1进行取与计算,这样做的好处是可以快速计算 hash值 % 数组长度,毕竟位运算的效率是要远高于取模这种除法计算的,但要求就是长度必须要是2的幂;拿出高16位进行异或运算,主要是降低在长度小于16次幂(65536长度,大部分情况不会在一个哈希表中存储6w多个元素吧)的情况下hash冲突,因为高16位在这个长度下取模运算时是用不到的,取一个异或“扰动函数”正好能降低哈希冲突的概率
举个极端点例子,假设现在哈希表的长度是16,那么他的某个Key的哈希为65536,那么他的二进制分布如下,而我们的长度为16,那么-1就是15,与15取与,其实就是只看后四位有几个1,而65536和很多16的倍数一样,后四位都是0这样冲突就很多了,取一个异或“扰动函数”,能有效将高16位的哈希值“映射”到低16位上,降低哈希冲突。
这里再来看下最后一步 & HASH_BITS,HASH_BITS的值为Integer的最大值,那么我们可以知道,如果是正数,与这个数取与是完全没有影响的,因为除以最大的正整数的模肯定还是自己了,但是如果是负数,那么经过这样取与操作之后就变成了正数,Integer的MAX值除了最高为0以外都是1,那么与负数最高为1相与,只剩下正数部分了,换句话说,这个& HASH_BITS 是在保证Hash值不能为负数。因为在ConcurrentHashMap中,负数有特殊的含义:
- MOVED = -1; // hash for forwarding nodes 代表正在迁移
- TREEBIN = -2; // hash for roots of trees 代表当前节点是红黑树节点
- RESERVED = -3; // hash for transient reservations 代表当前节点是预留节点,用于临时存储的计算节点
搞清楚这里之后,那么我们在下面判断节点类型时就可以用hash>0来确定是普通还是其他节点,是负数说明可能是红黑树或者预留节点或迁移节点了。
桶数组的初始化
// 这个方法只作为初始化用
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 这里将tab设置给tab,方法内部操作是tab
// 可以这样直接操作的变量,必须要是volatile修饰的,以确保内存可见性
while ((tab = table) == null || tab.length == 0) {
// 如果sc 小于0 说明其他线程已经在扩容了
// sizeCtl肯定也必须是volatile修饰的
if ((sc = sizeCtl) < 0) // A
// yield 释放资源调度 给其他线程机会 空转
Thread.yield(); // lost initialization race; just spin
// 这里利用CAS的方式将sizeCtl的值更新成-1 标识当前线程需要去初始化了
// 如果竞争失败,就会在上面if中让出资源
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 这里我们来思考下 为什么要double check 经过CAS之后肯定只有一个线程
// 假设第一个线程刚刚执行到代码A处,并且还没有赋值完成,切换到第二个线程,正在这里初始化,
// 判断不为空,创建新的Node,还没有来得及执行finally代码,切换到第一个线程
// 那么他也会cas成功,如果这里不再次check 就会创建两次,虽然容量是一致的,实际上也浪费的性能
// 所以这里二次验证,就是为了避免高并发场景下,哈希表被重复实例化的情况
if ((tab = table) == null || tab.length == 0) {
// 无参构造时 就是默认值,此时使用默认大小 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化完成后,这个sc就是下次需要扩容阈值了,0.75倍的容量,并且不可更改
// 所以从这里也可以看出,ConcurrentHashMap没有加载因子的概念,无论是否设置
// 都会使用0.75
sc = n - (n >>> 2);
}
} finally {
/**
* 放在finally中的就代表是一定需要执行的,如果在创建哈希表数组时发生OOM异常等异常,那么
* 此时sizeCtl就是-1 一直空转了,需要在finally中释放状态,代表初始化失败,给其他线程
* 初始化的机会
*/
sizeCtl = sc;
}
break;
}
}
return tab;
}
哈希表实际上就是存储不同类型Node的数组,这里的Node 类型可以是链表,也可以是红黑树,都是解决在Hash冲突的情况下元素查询问题
小结下initTable方法:
- 判断是否有其他线程初始化,如果有则当前线程会放弃竞争资源,空转
- 如果是第一个线程进来,会使用cas的方式将sizeCtl的值修改为-1(sizeCtl状态表示下面统一总结),cas成功后会进行初始化,长度为sizeCtl之前初始化时的值,接着会设置扩容阈值为table长度的0.75倍到sizeCtl上。
- 初始化完成,返回table
初始化桶数组的逻辑很像double check的单例模式,只是锁的验证方式换成了CAS自旋锁,这里我也写了基于CAS实现的Lock Free单例模式。代码放在了github上。
为桶内添加第一个元素
这里先来看一下putVal的第四个逻辑判断,当hash冲突时给桶内加元素,和HashMap逻辑类似,不同的是在桶上有锁,这里也诠释了JDK8中的 ConcurrentHashMap 中锁的粒度是在桶级别
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算Hash值 实际上使用的Key的hashCode方法
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 折叠
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 折叠
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 协助扩容 折叠
else {
// 这里说明 桶内已经有至少一个节点
V oldVal = null;
// 这里给桶的第一个节点加了锁,对链表或者红黑树是没有使用lock free编程的
synchronized (f) {
/**
* 这里又是一个double check 因为synchronized是阻塞的,如果在synchronized期间,
* 其他线程插入了该桶的节点,触发了扩容,此时当前的table不再是原来的oldTable了,
* 当本线程切换回来使用newTable计算hash槽也会不一致;或者是扩容时将当前桶置为ForwardingNode节点,
* (ForwardingNode是一个标记节点,hash值为MOVE的-1),此时也已经不是之前的桶节点了,会出现
* tabAt(tab, i) != f 不一致的情况
*/
if (tabAt(tab, i) == f) {
/**
* 如果大于0说明是一个普通节点,那么直接使用链表的插入方法
*/
if (fh >= 0) {
// 这个用来记录当前桶的节点数量
binCount = 1;
// 循环链表上的每个节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果onlyIfAbsent 是false 用传入的Value覆盖旧的值 跳出循环
if (!onlyIfAbsent)
e.val = value;
break;
}
/**
* 如果e的next节点为null 说明是最后一个节点 尾插法 插入最后
* 则将当前节点的next绑定为新节点,新节点的kv参数为传入的key和value
* 并且next为空
* e = e.next 如果不为null,那么 e 就会继续循环下去 知道找到最后一个节点
*/
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
// 如果是红黑树 则为2,后面讲为何是2
binCount = 2;
// 红黑树的逻辑不在此文中讨论
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount 在链表插入初始设置为1,最后为插入前的链表的长度 红黑树插入设置为2
if (binCount != 0) {
// 如果链表的数量已经达到了转成红黑树阈值(默认为8)的时候,则转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果是更新了旧值,就直接返回,否则跳出循环进入addCount方法
if (oldVal != null)
return oldVal;
break;
}
}
}
// 流程到这里 就代表put成功,map的数量加1
// 在这里同样也有协助扩容的逻辑
addCount(1L, binCount);
return null;
}
接上putVal方法小结:
- 当hash冲突时,会给头节点加synchronized锁,阻塞其他put线程。插入时采用尾插法,插入后会记录当前桶的节点数binCount,如果binCount大于等于链表转换红黑树的阈值(默认为8)会转换为红黑树节点。
- 最后调用addCount方法增加计数器的值
在ConcurrentMapCache的get方法中实际上使用的computeIfAbsent方法,下面我们来看下computeIfAbsent方法
方法computeIfAbsent
computeIfAbsent方法和putVal方法很类似,这里只看不同的部分
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
// 这里 mappingFunction 和key都不能为null
if (key == null || mappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
/**
* 在上面哈希方法文中可以知道 Hash值除了正数 负数有三种类型,其中RESERVED -3 代表预留节点
* ReservationNode 在初始化的时候会设置hash值为RESERVED
* 这里假定mappingFunction 是一个耗时操作,只会允许一个线程计算,这里在计算之前标记此位置是一个预留节点
* 所以ReservationNode只是一个占位符,在操作冲突时会阻塞其他线程插入节点
* 在加锁之后 会使用casTabAt 的方式操作 设置此处为预留节点 如果失败就说明有其他线程竞争成功去计算了,退出循环重试
*/
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
// 使用cas的方式设置预留节点 synchronized 代码块一般会带一个double check
if (casTabAt(tab, i, null, r)) {
// 这里binCount只是一个标志,如果cas成功 才为1 如果失败需要在外层循环重试
binCount = 1;
Node<K,V> node = null;
try {
/**
* 调用 mappingFunction.apply 计算value 如果不为空则会创建新的节点并赋值给node
* 最后在finally中更新 tab中i的值为node 如果计算的值为null那么不会更新总数
*/
if ((val = mappingFunction.apply(key)) != null)
node = new Node<K,V>(h, key, val, null);
} finally {
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
boolean added = false;
/**
* 假如 在计算节点ReservationNode 此时发生其他线程进来操作
* 那么 这里的锁的和上面ReservationNode节点是同一把锁
*/
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek; V ev;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = e.val;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
/**
* 与putVal方法类似 找到最后一个节点 并且计算这个值 如果不为空则将next设置为新节点
* added会作为是否成功新建节点的判断条件
*/
if ((val = mappingFunction.apply(key)) != null) {
added = true;
pred.next = new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null)
val = p.val;
/**
* 与putVal方法类似 找到最后一个节点 并且计算这个值 如果不为空则将next设置为新节点
* added会作为是否成功新建节点的判断条件
*/
else if ((val = mappingFunction.apply(key)) != null) {
added = true;
t.putTreeVal(h, key, val);
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果未新增 则说明已经存在相同key的值 直接返回即可
if (!added)
return val;
break;
}
}
}
/**
* 这个val会作为新增的判断条件 mappingFunction会将计算的值赋值给val
* val = mappingFunction.apply(key)
* 只会在不为空时新增计数器数量
*/
if (val != null)
addCount(1L, binCount);
return val;
}
分析完putVal方法再来看computeIfAbsent方法就很容易理解了,这里变更的点就是在判断需要操作的节点时新增了ReservationNode节点,并且操作时会加synchronized关键字,阻塞同桶其他线程操作,计算完成后,会在finally中设置回新节点,如果mappingFunction计算的值不为空,还会新增计数器的值。
多线程扩容
扩容前置判断
先打个预防针,ConcurrentHashMap的扩容方法是比较复杂的,也是极富有学习价值的。顺便套一句话:面试并不一定会问到,但是仍然值得好好研究。虽然你我可能没有太多机会实现这样的逻辑,但是了解大师的设计思路,对于自己的能力来说也是一种提升。
addCount方法在putVal成功后执行,并且x为新增元素个数,默认情况只会是1了,check方法是当前桶的元素数量,大于0才会走判断扩容逻辑。
private final void addCount(long x, int check) {
// 对于计算总数的方法我们先折叠起来
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 上面代码 折叠 等下看
s = sumCount();
}
// check不是负数, 就帮助扩容, 在clear 方法中会设置为 -1
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
/**
* s 记录了当前map中所有节点的数量 ,这里的计算逻辑 比较直接,
* 如果table不为空,且容量小于最大值,当前节点的数量大于扩容的阈值时进行扩容
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
/**
* 这个函数的返回值记录了当前数组长度的标志位,在下面有讲
* 同一个长度的数组,计算的rs值一定是相同的
*/
int rs = resizeStamp(n);
/**
* 如果sc小于0 说明已经有其他线程开始扩容了, 那么此处的逻辑可以理解为helpTransfer 的判断条件了
* 判断当前是否需要线程协助扩容
* 正常情况下 sc的值为大于扩容的阈值 肯定是大于0的
*/
if (sc < 0) {
/**
* 这里有五个与操作 有一个则退出协助扩容逻辑
* (sc >>> RESIZE_STAMP_SHIFT) != rs rs是根据数组长度算出来的,sc在下面一个if就是第一个
* 线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明
* 扩容时数组的长度变了,其他线程触发了第二次扩容
* sc == rs + 1 这里算是一个BUG,实际上,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + 1
* 因为sc的值是rs左移16位 出来的,低16位表示线程扩容的状态,在下面那个if 就是就是第一个
* 线程进来的时候低16位是2, +1 = + 2 - 1, 这里的减1来自最后一个线程检查,下面会说
* sc == rs + MAX_RESIZERS 这里和上面逻辑类似,判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,
* 实际上也是一个BUG,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
* nextTable == null 说明已经扩容完成 这里不需要在协助扩容了
* transferIndex 说明bound区间已经都分配完了,那么也不需要扩容了 (下面有说)
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/**
* 走到这里 说明需要线程协助扩容的,使用cas的方式 将sizeCtl+1,然后进入
* transfer方法帮忙做迁移
* 这里也可以了解到,sizeCtl 的高16位表示的是哈希表数组的标志位,唯一长度的哈希表只有一个标志
* 低16位 - 1 后表示正在扩容的线程数
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/**
* 如果是第一个扩容线程进来,会来到这里
* 此处将sizeCtl的值设置为rs的左移16位 + 2,从resizeStamp函数 可以知道 返回值只是长度的一个标志位
* 并且只在低16位有值,第15位位1,这样操作后仍然是一个负值,符合上面sc < 0的协助扩容判断
*
* +2 低 16位 为二进制的 10
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
// 重新计算下总数,下次循环判断是否需要扩容
s = sumCount();
}
}
}
其中resizeStamp方法如下
static final int resizeStamp(int n) {
/**
* Integer.numberOfLeadingZeros(n) 返回的是该数二进制表示情况下,最高位前面1的个数,比如
* 128表示为 0000 0000 0000 0000 0000 0000 1000 0000 ,返回24,即最高位1前面有24个0
* Integer.numberOfLeadingZeros(n) 返回的最大值是32,最小值为0
* 用二进制表示就是
* 32 0000 0000 0000 0000 0000 0000 0010 0000
* 0 0000 0000 0000 0000 0000 0000 0000 0000
* RESIZE_STAMP_BITS 是 16,1 左移 15 位就是
* 32768 0000 0000 0000 0000 1000 0000 0000 0000
* 取或运算 就是 任何一个1 为1 都为1
* 那么 resizeStamp 的值区间为 32768 ~ 32800
* 32800 0000 0000 0000 0000 1000 0000 0010 0000
*
* 可以看出 返回值就是高16位全是0,低16位包含了哈希表数组的长度信息
*/
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
这里我们画张图简单了解下这个值的运用,理解这个函数对理解哈希表当前的状态sizeCtl也是有帮助的
我们理解了resizeStamp方法,其实就很好理解addCount方法的扩容部分了,下面我们小结一下:
- 扩容的部分首先会判断sumCount()即当前哈希表元素数是否大于sizeCtl(扩容阈值),如果是true代表需要扩容
- resizeStamp会根据当前数组长度计算一个16位的唯一标志位,如果是第一个扩容线程,会将sizeCtl的高16位设置成它,低16位设置成 线程数 + 1(因为 第一个线程线程为2)
- 如果根据sizeCtl判断(小于0则代表已经有线程正在扩容),是第二个或者N个扩容线程,会将sizeCtl的低16位+1,并尝试协助扩容
协助扩容helpTransfer
在上面put方法中在判断是MOVE节点时,有一个帮助扩容方法helpTransfer。理解了addCount方法再来理解helpTransfer就很容易了,咱们来看下这里面的实现:
// 这个方法 主要是用来判断是否需要协助扩容的
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果哈希表不为空,且当前节点为ForwardingNode节点说明当前桶正在迁移中
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 这个函数在下面有解释,实际上就是把长度信息封装到低16位了
int rs = resizeStamp(tab.length);
/**
* 既然是协助,那么这里的代码,肯定是在判断其他线程正在扩容了
* nextTab == nextTable 新数组不能为空 为空代表结束了
* sc = sizeCtl 小于0 说明扩容中
*/
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/**
* 这里的判断和上面那个是否需要协助扩容逻辑一致
* (sc >>> RESIZE_STAMP_SHIFT) != rs rs是根据数组长度算出来的,sc在下面一个if就是第一个
* 线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明
* 扩容时数组的长度变了,其他线程触发了第二次扩容
* sc == rs + 1 这里算是一个BUG,实际上,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + 1
* 因为sc的值是rs左移16位 出来的,低16位表示线程扩容的状态,在下面那个if 就是就是第一个
* 线程进来的时候低16位是2, +1 = + 2 - 1, 这里的减1来自最后一个线程检查,下面会说
* sc == rs + MAX_RESIZERS 这里和上面逻辑类似,判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,
* 实际上也是一个BUG,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
* transferIndex 说明bound区间已经都分配完了,那么也不需要扩容了 (下面有说)
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
/**
* 走到这里 说明需要线程协助扩容的,使用cas的方式 将sizeCtl+1,然后进入
* transfer方法帮忙做迁移
* 这里也可以了解到,sizeCtl 的高16位表示的是哈希表数组的标志位,唯一长度的哈希表只有一个标志
* 低16位 - 1 后表示正在扩容的线程数
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
// 扩容成功 返回新的tab 哈希表
return nextTab;
}
return table;
}
协助扩容的逻辑和addCount后半部分类似,这里小节下协助扩容的逻辑:
- 因为是协助,首先需要判断是否已经有其他线程在扩容:
- sizeCtl必须要小于0,高16位存储resizeStamp长度的标志位,低16位存储扩容线程数 + 1(第一个为2)
- 判断是否是正在扩容的标志是nextTable,即nextTable只存在于扩容中 其他时间都为null
- 判断是否需要当前线程去帮助扩容:
- (sc >>> RESIZE_STAMP_SHIFT) != rs rs是根据数组长度算出来的,sc在下面一个if就是第一个线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明扩容时数组的长度变了,其他线程触发了第二次扩容
- sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS (原来的判断是sc == rs + MAX_RESIZERS)判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量
- transferIndex 说明bound区间(扩容部分讲)已经都分配完了,那么也不需要扩容了
- 如果需要则把sizeCtl的低16位+1,标志正在扩容线程数,调用transfer方法进行协助扩容。
这里说说上面的BUG:
- sc == rs + 1判断 是否是最后一个线程检查,实际上这里应该永远不可能等于的,因为之前我们也知道sc存放分为两部分:高16位为rs的值,低16位为扩容线程数+1,那么这里就应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1而不是直接判断,如果直接判断rs是永远不会大于65536的(1 <<< 16)因为他只有低16位有值,那么sc都是负数了,怎么会相等呢?
- sc == rs + MAX_RESIZERS 判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,和第一个同理,rs如果没有左移16位最大值肯定小于65536,MAX_RESIZERS的值为65535,它俩加起来就是131,071,这个和负数溢出还差十万八千里呢... 所以,这两个检查相当于没有,永远不可能成立,当然,有理有据在OpenJDK的bug提交记录上可以看到如下的Bug记录:JDK-8214427
由上可以看到,这个bug在Java 12及之后的版本修复了,所以下面来看一下这块改成了什么。JDK12对比
从上面可以看出,这两个判断条件已经修复了。在判断前int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT 对rs做了左移16位操作,此时判断sc == rs + MAX_RESIZERS 和 sc == rs + 1才是有意义的。顺便看到删除了JDK8 中(sc >>> RESIZE_STAMP_SHIFT) != rs这个判断,这个是判断此时是否是原哈希表的,如果协助扩容时发生了二次扩容,这里resizeStamp会生成不同的标志位,网上也搜索了相关资料,没有找到合理的解释,这里有知道缘由的可以在下方留言告诉我。
这里说到协助,扯点题外话:我们不妨想想,我们一直在寻找限制多线程的方法:加锁阻塞、自旋,因为扩容时需要对内部数据迁移,是一个耗时操作,但如果此时仍然选择阻塞或者自旋其实都是性能上的损失,大师的思想就是:其他线程不会被阻塞或者自旋,而是去帮助扩容,拥抱多线程,而不是拒绝它,最大程度的发挥多核CPU的优势。
什么时候扩容
了解扩容方法之前,我们先看下扩容的前置条件,什么时候扩容?
- 当put元素达到扩容阈值时,会在addCount中判断,调用transfer方法扩容
- 触发tryPresize操作,有两种情况下会触发tryPresize方法:
- putAll方法在put 元素之前会调用tryPresize传入Map的长度,将数组的长度扩容至大于传入map长度的最小2次幂
- 在转换某个桶节点为红黑树时,会判断当前数组的长度是否小于MIN_TREEIFY_CAPACITY(默认是64),如果小于这个长度,那么会尝试用tryPresize方法扩大数组的长度为当前的2倍,并触发transfer方法,重新调整节点的位置
尝试调整方法tryPresize
private final void tryPresize(int size) {
/**
* 判断传入的size 是否到最大长度,如果是则取最大值
* 否则按照 扩容阈值 的方式计算它的最小2次幂
* 即和构造函数一样 n >= 1.5 * size + 1
*/
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 注意这里是一个死循环
while ((sc = sizeCtl) >= 0) {
/**
* 下面的判断和初始化table 逻辑类似 初始化,将sc更新为sizeCtl -1 表示正在初始化
* 这里有一个不起眼的判断 n = (sc > c) ? sc : c 实际上就是取数组长度的最大值
* 如果putAll方法传入的哈希表过小 这里是不能按照传入的来初始化table的
*/
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
// sc就是扩容的阈值,sizeCtl在不操作是存储的下次扩容的阈值
sizeCtl = sc;
}
}
}
/**
* 如果c小于sc 说明已经初始化且大小合适 不需要扩容 直接结束了
* 比如 传入的size是 32 经过tableSizeFor方法后是64
* 但实际上此时的sc可能其他线程操作了扩容 已经是96了 那么此时就不需要再继续扩容了
*/
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
/**
* 排除上面两个判断,这里只可能是sc < c了 代表已经初始化过,但是需要扩容的情况
*/
else if (tab == table) {
/**
* 这里的逻辑和helpTransfer方法类似 判断当前的 sc 是否小于0 如果小于0说明其他线程正在扩容 此线程可以尝试去帮忙
* 如果sc 大于0 此线程 尝试 抢占第一个扩容线程 如果成功后 会将sizeCtl 设置为(rs << RESIZE_STAMP_SHIFT) + 2
*/
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
// 这里的判断条件和helpTransfer一致 就不再赘述
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将sizeCtl的低16位+1 即扩容线程数 +1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
这里小结一下扩容的逻辑
- 根据传入的size计算最小2次幂,如果大于最大值的一半,则设置最大值MAXIMUM_CAPACITY(默认为1 << 30)
- 判断是否是初始化,如果是初始化,那么使用传入的size的最小2次幂和当前构造函数设置的扩容阈值比较,去一个max作为数组的初始长度
- 判断是否需要扩容,即扩容阈值小于当前传入的size的最小2次幂,这里的判断逻辑和helpTransfer中类似,判断当前的 sc是否小于0 如果小于0说明其他线程正在扩容 此线程可以尝试去帮忙,如果sc大于0 此线程 尝试 抢占第一个扩容线程 如果成功后 会将sizeCtl 设置为(rs << RESIZE_STAMP_SHIFT) + 2
扩容方法transfer
前面铺垫还是挺多的,这里来详细看下扩容的核心逻辑。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
/**
* 这里在计算每个线程需要执行的桶个数,下面简称为bound区间
* NCPU 是cpu的虚拟核心数 如果8c16t那么就是16
*
* 如果 NCPU > 1 则每个bound区间的个数为 (n >>> 3) / NCPU 否则为n 就是不拆分
* 如果算出的bound区间太小 那么就使用 MIN_TRANSFER_STRIDE 默认是16
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 这个nextTab是传入的 在外判断为第一个线程扩容时 为null
if (nextTab == null) { // initiating
try {
// 扩容的数组为2倍长度 设置给nextTab
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 到这里说明异常了 除了受检异常还有OOM异常,此时放弃扩容,并将sizeCtl设置为最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable是一个 volatile修饰的 节点 ,代表扩容后的哈希表
// 小Tip 这里为什么不需要cas的方式为nextTable赋值? nextTab 是外面传入的 在上个方法会判断扩容的状态
nextTable = nextTab;
// transferIndex 设置的值为原数组长度
transferIndex = n;
}
// nextn 存储了新数组的长度
int nextn = nextTab.length;
/**
* 这里第一次看到ForwardingNode 节点 他的Hash值为MOVED -1
* 并且存储了新哈希表的引用 只有空桶会用到
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 这里声明了 两个变量 advance当前bound区间迁移是否完成 和 所有bound区间是否迁移完成
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 这里是一个死循环 i表示桶的位置,bound是当前线程区间
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
/**
* i是桶的下标,从原数组长度减到-1 表示迁移完成一个桶 就下一个
* [bound,transferIndex] 是每个线程的扩容执行区间,transferIndex在第一次进来时会初始化成原数组的长度n
*
* --i >= bound 如果线程刚刚分配完还没干活,那么i就在上面区间内
* finishing 等下说
*/
if (--i >= bound || finishing)
// 不需要分配
advance = false;
// transferIndex小于0则代表分配完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
// 不需要分配
advance = false;
}
/**
* nextIndex - stride 就是这个线程区间的下标 nextIndex是当前未分配的位置,stride就是bound区间的大小
* cas 成功 更新transferIndex的值为 nextBound的值
* 如果cas失败 那么就说明其他线程进来了 下次重试
*/
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// nextBound 就是nextIndex - stride 的值
bound = nextBound;
// 索引比长度 小1
i = nextIndex - 1;
// 已经分配好 不需要分配
advance = false;
}
}
/**
* 只有分配完成后advance为false 或者 不需要再次分配时 会来到这里
*
* i 是索引 小于0代表都迁移完成了
* n 是长度 i 理论上一直在减 是根本不会大于n的 除非Int数据溢出了,即 Integer.MIN_VALUE - 1 = Integer.MAX_VALUE
* 上面分配时cas的失败后会不断减小i的值
* i + n >= nextn 说实话 没看懂... nextn是新数组的长度 i + n 和 理论上也不会发生
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// finishing完成时为true 这时更新nextTable为null 将nextTab赋值给table
nextTable = null;
table = nextTab;
// sizeCtl 的值会再次变为正数,扩容的阈值 为当前旧数组的长度 1.5倍 即新数组的0.75倍
// 可以看出 这里的负载因子 使用的是0.75 不会使用传入的值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/**
* 难以理解的是这里需要进行一次二次检查 TODO
* 从addCount方法可知,扩容线程数存储在sizeCtl的低16位-1上 比如 2个线程 那么sizeCtl的低16位值为3
* 每个线程执行完成后 在这里会做一次 减 1
* resizeStamp(n) << 16 sizeCtl高16存储其实就是它
* 但只有第一个线程的值 sc 会等于 (rs << RESIZE_STAMP_SHIFT) + 2
* 这里就在验证当前线程是否是第一个线程 如果是 则需要做一次二次检查 如果不是 就退出了
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// finishing只有这里会设置为 true
finishing = advance = true;
// 将i继续设置为原数组长度,重新跑一边全部的桶 如果没有做完就继续
// 想想 为什么要二次check?
i = n; // recheck before commit
}
}
/**
* 如果原数组 桶上这个位置没有值 说明不需要迁移 直接赋值ForwardingNode
* 赋值这个就是一个标志,如果是MOVE的,那么其他线程在put时就不会操作了
* 就会进到helpTransfer方法中帮助扩容
*/
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 如果检测到这个位置已经有其他线程在迁移了 那么跳出循环重新分配
advance = true; // already processed
else {
/**
* 这里和putVal一样 锁住了桶的第一个节点 以防止其他线程操作
* 上面分配和扩容实际上是一个循环,如果其他线程分配失败 也会到这里来
*/
synchronized (f) {
// 锁中double check 是一个好习惯
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh 在putVal方法中已经有说 如果大于0说明是普通链表Node节点
if (fh >= 0) {
// 折叠 链表扩容的方法
/**
* 将advance设置为true 就会再次进入上面分配方法中 --i >= bound 判断是否分配完成
* 如果没有分配完成 会继续下一个桶
*/
advance = true;
}
// 红黑树节点 折叠
else if (f instanceof TreeBin) {
// advance true 与上面链表同理
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
分析完扩容的逻辑,我们再画张图好好理解下:
小结下扩容的逻辑流程为:
- 计算bound区间太小,即每个线程执行扩容的桶个数,最小为16
- 构造长度2倍的空数组,作为新的哈希表
- 每个线程从最后往前竞争自己的扩容区间,如果竞争成功就将transferIndex的值减bound区间数
- 线程在桶执行扩容逻辑之前会给第一个节点加锁,阻塞putVal线程新增数据。链表节点在原数组长度最高位位置的二进制值判断,为0则在当前桶下标i位置,为1在当前下标 i + n 位置处(这里采用头插法);红黑树在拆分后会判断节点数是否小于6,如果是会退化成链表,迁移部分本文不展开研究。当前桶迁移完成后,设置成ForwardingNode节点。
- 如果当前桶没有节点,那么就设置ForwardingNode标志正在扩容,让put的线程也进来帮助扩容
- 当前线程扩容完成后,如果是第一个线程,那么需要double check 下是否全部扩容完成,如果扩容完成返回新的nextTab,扩容结束
这里面还有链表和红黑树扩容,就是折叠的部分,虽然链表扩容的代码是单线程操作的,但里面的思想也是非常值得拜读的
链表扩容
和HashMap扩容类似,但会先计算是否原数组长度的二进制最高位的值是否全部一致的,如果一致说明不需要拆分直接全部迁移,只用修改头结点的位置即可,算是性能上做了小优化。
/**
* fh 与 n 求与 这里的n是原数组的长度 2的幂 除了最高位都是0
* 那么runBit 这个值是头节点与数组长度 取与 结果就是数组长度二进制最高位位置的值
* 如果是0 则代表桶的头节点f 扩容后仍然在原位置i 如果为1 则为 i + n 的位置
* runBit 这个值只可能 == n 或者为0
*/
int runBit = fh & n;
// 第一个桶节点
Node<K,V> lastRun = f;
/**
* 这段代码需要倒着看 if (b != runBit) { runBit = b } 这个判断是将循环到最新的哈希值赋值给runBit
* 而上面知道 p.hash & 得到的值只可能是 n 或者 0 简单说就是不是高位就是低位
* 是在检查倒数X个相同节点到头结点 是否同是高位 或者低位
* 举个栗子 假如链表算出来 依次是
* 节点 A B C D E F
* 哈希 n 0 0 n n n
* 那么 lastRun 就是 D节点,runBit就是n了
*
* 这样做有什么好处呢?HashMap中 是全部都与长度n计算了一遍最高位,但是如果全是高位或者低位
* 其实这里就不需要在进行计算了 直接将头结点赋值给对应的位置即可 ConcurrentHashMap这里就是一个优化检查
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 如果最后一个变动的节点是0 那么 全都放入低位 为下面尾插法做准备
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 如果最后一个变动的节点是1 那么 全都放入高位 为下面尾插法做准备
else {
hn = lastRun;
ln = null;
}
/**
* 这里就开始尾插法的逻辑了 从头开始遍历 结束节点为lastRun
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
/**
* 这里和上面计算逻辑一致 如果数组长度二进制最高位的值为1
* 头插法放入高位链表,如果是0放入低位链表
* 这里头插法的区别就是插入到前面,next的值为 上一次循环 链表的头
* 这样也说明了 ConcurrentHashMap的链表迁移是非稳定算法
* 不是正序也不是逆序
*/
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
/**
* 走过上面的逻辑 我们已经拆分了两个链表 ln低位链表 hn高位链表
* 将低位链表ln放在新哈希表的i 位置
* 将高位链表hn放在新哈希表的i + n 位置
*/
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
/**
* 旧数组的i位置已经迁移完毕,这时候需要设置此处为ForwardingNode节点
* 意思是不能在这个位置put元素了,别的线程也不会进来处理
*/
setTabAt(tab, i, fwd);
/**
* 将advance设置为true 就会再次进入上面分配方法中 --i >= bound 判断是否分配完成
* 如果没有分配完成 会继续下一个桶
*/
advance = true;
链表扩容部分我们画张图来帮助理解下:
小结下链表迁移的逻辑如下:
- 根据旧数组长度n对每个节点的哈希值取与来计算最后变更的节点lastRun,假如一个都不需要变动,那么头节点就是lastRun
- 设置lastRun的节点位置,如果最后变更的是节点哈希值与长度n取与是n,那么说明此哈希值在数组长度二进制最高位的值为1,放入高位链表,否则设置为低位链表
- 循环链表的每个不为lastRun的节点,判断哈希值在数组长度二进制最高位的值是否为1,如果为1,则用头插法设置到高位链表,为0会设置到低位链表的头节点,直到操作到最后一个变更的链表节点lastRun,这样也间接说明了ConcurrentHashMap的链表迁移是非稳定算法
- 拆分完成后得到高低位两条链表,将低位链表ln放在新哈希表的i 位置,将高位链表hn放在新哈希表的i + n 位置。并且将旧数组的i位置设置为ForwardingNode节点,标记此桶已经迁移完成。
参数小结
状态变量 sizeCtl
在ConcurrentHashMap中有一个比较重要,且出现频率很高的变量sizeCtl,它记录了哈希表操作的状态,我们这里小节一下它出现的含义:
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
- ConcurrentHashMap未初始化时,它存储了哈希表需要创建的大小,默认为0
- ConcurrentHashMap初始化时,它为-1,表示正在初始化
- ConcurrentHashMap初始化后,它为下次扩容的阈值
- ConcurrentHashMap正在扩容时,它是一个负值,它的高16位存储了通过哈希表旧数组计算得到的标志位,低16位存储了 (1 + 正在扩容的线程数)(第一个线程为2,其他为1)
回归问题
分析完成源码后,再次回归问题,看下面四行代码
/**
* 容器启动方法
*/
public void start() {
System.out.println("============ start ============");
String mainConfig = getConfigFromEnv("mainConfig");
System.out.println("start up at mainConfig: " + mainConfig);
System.out.println("============= end =============");
}
再看下日志:
日志显示执行完start方法后就没有然后了,展开getConfigFromEnv方法
/**
* 优先从缓存取 取不到从其他配置服务获取
*/
private String getConfigFromEnv(String configKey) {
return cache.get(configKey, () -> getConfigByOtherConfig(configKey));
}
/**
* 模拟从其他配置服务拉取配置场景
*/
private String getConfigByOtherConfig(String configKey) {
String active = System.getenv(configKey);
/**
* 如果是生产 对加密的部分进行解密
*/
if (PRO_ENV.equals(getSpringActive())) {
System.out.println("decrypt config ...");
return active;
}
return active;
}
/**
* 优先从缓存取 取不到给dev
*/
private String getSpringActive() {
return cache.get("active", () -> "dev");
}
整理下,获取配置的值getConfigFromEnv方法主要做了这么个事:
- 从缓存中获取,使用ConcurrentMapCache提供的get方法,获取不到时使用Callable的valueLoader,跳转到方法getConfigByOtherConfig,表示获取不到从配置服务拉取配置。
- 这里getConfigByOtherConfig采用从系统中获取,无论是否拿到,如果是生产环境需要对参数进行解密(常规解密操作都发生在这里),这里显然不是,但是他需要获取Spring的Active,回到方法getSpringActive
- 获取active配置仍然是从缓存中,如果没有默认给dev。(幸好不需要再解密了,不然就套娃了)
再来细细看下ConcurrentMapCache的相关方法
public class ConcurrentMapCache {
private final String name;
private final boolean allowNullValues;
private final ConcurrentMap<Object, Object> store;
@SuppressWarnings("unchecked")
public <T> T get(Object key, Callable<T> valueLoader) {
Object storeValue = this.store.computeIfAbsent(key, k -> {
try {
T userValue = valueLoader.call();
if (userValue == null) {
if (this.allowNullValues) {
return NullValue.INSTANCE;
}
throw new IllegalArgumentException("Cache '" + getName() + "' is configured to not allow null values but null was provided");
}
return userValue;
} catch (Throwable ex) {
throw new RuntimeException();
}
});
if (this.allowNullValues && storeValue == NullValue.INSTANCE) {
return null;
}
return (T) storeValue;
}
public void put(Object key, @Nullable Object value) {
if (value == null) {
if (this.allowNullValues) {
value = NullValue.INSTANCE;
} else {
throw new IllegalArgumentException("Cache '" + getName() + "' is configured to not allow null values but null was provided");
}
}
this.store.put(key, value);
}
protected ConcurrentMapCache(String name, ConcurrentMap<Object, Object> store,
boolean allowNullValues) {
this.name = name;
this.store = store;
this.allowNullValues = allowNullValues;
}
public final String getName() {
return this.name;
}
}
小整理下就两个方法:get和put分别对应新增和查询:
- put方法 如果value为null,首先判断allowNullValues是否允许空值对象,如果允许则保存NullValue.INSTANCE作为ConcurrentMap的value保存起来,否则抛出异常。这样的好处是将null值和空缓存分离。
- get方法 调用 ConcurrentMap的computeIfAbsent方法,即如果没有则使用传入valueLoader计算得出value,如果为null,则判断是否allowNullValues,支持的话说明可能有NullValue.INSTANCE,此时仍然返回null
从上面看出,NullValue.INSTANCE是有好处的,因为如果为某个键的value为null值,那么保存NullValue.INSTANCE可以有效避免造成缓存穿透的问题,但是,这样也给ConcurrentMap带来了一个空值,这个空值会存在于哈希表中,触发存储、扩容等,当然这个和缓存穿透比起来带来的影响就小很多了。
解答疑问
这样看代码,不太容易串起来理解问题,我们同样,再画张图看看:
顺着这个图,把线撸顺了,再结合上面对ConcurrentHashMap的深入理解,我们小结一下:
- 初始化容器,ConcurrentHashMap中构造了 RESIZE_CAPACITY - 1 个元素,即这时再新增一个就会触发扩容。
- 从缓存中获取Config的值,当此时从ConcurrentHashMap中获取不到,ConcurrentHashMap则会调用传入的mappingFunction计算需要的值,但是,这个时候为了防止其他线程并发操作,ConcurrentHashMap设置了一个预留节点,并且使用了synchronized对其加锁,防止哈希冲突时其他线程并发计算。
- 在计算时,恰好不巧需要往容器内增加profile的值作为缓存,此时新增有两个问题:
- 假如哈希冲突相同,进入重入锁,执行putVal的插入逻辑,然后当前桶是负的哈希值(RESERVED -3),死循环(因为putVal就没有处理预留节点的逻辑)
- 假如哈希值不冲突,插入到其他桶节点,不会阻塞操作成功,进入addCount计数器+1,判断需要扩容,则进行扩容,分配线程bound一系列操作后,发现处理预留节点时(RESERVED -3),死循环(恰好transfer也没有处理预留节点的逻辑)
综上,咱们也了解了为什么这里会死循环,实际上,这里是用一个例子,模拟了两个computeIfAbsent在计算节点时会产生死循环的BUG,当然已经有大佬帮忙提了 JDK-8062841
这个bug已经在JDK9版本修复了,我们可以看下相关修改
在插入时(扩容时也有类似逻辑)会判断当前桶是否是预留节点,如果是则会抛出异常,不允许修改。
最佳实践
说了那么多,我们来想想,要怎么做才是最佳的?
- 导致死循环的根源是computeIfAbsent方法,因为在计算时设置的预留节点ReservationNode不是基本节点而是一个临时节点,无法进行同时新增或者扩容迁移,那么我们应该避免在computeIfAbsent的mappingFunction新增元素,即避免套娃使用
- 读懂ConcurrentHashMap的流程后,我们应该构造更好的哈希键,性能瓶颈在于线程竞争锁资源,减少锁的竞争,方能达到最佳性能。
- Cas是一种乐观锁,并发低的情况下,使用它可以降低性能消耗,但是并发很高会不断尝试,白白消耗资源也抢占不到资源,此时重试使用synchronized也是一种明智的方案
这里可以看出从JDK7到JDK8的优化是很精妙的,之前的Segment技术在扩容时其他线程对本Segment写是挂起等待的,从等待到协助扩容,让我们知道多线程竞争问题不是程序的瓶颈,要运用多线程首先学会去拥抱多线程,分段扩容技术是很优秀的思想,一个方法能够让多个线程同时协作完成的代码是很值得拜读的。
感谢你能看到这里,至此,这个BUG已经搞清楚了,顺便学习了ConcurrentHashMap的执行整体流程,也是收获颇丰的,相关知识点我已经写在小节里了。如果你有疑问,可以在下方留言,我看到便会回复。接下来,我会在下篇中整理多线程计数、查询、移除等方法完善ConcurrentHashMap的学习历程。
本文代码地址 github
参考
JDK 8 源码
https://my.oschina.net/qiketime/blog/4694727
https://segmentfault.com/a/1190000021237438
https://juejin.cn/post/6844904191077384200#heading-2
https://www.zhenchao.org/2016/08/16/java/segment-based-concurrent-hashmap/