沧澜的博客

芝兰生于幽谷,不以无人而不芳


  • 首页

  • 归档

  • 分类

  • 标签

  • 搜索
软件思想 SpringBoot 领域驱动设计 算法 中间件 计算机网络 MySQL 数据库 javascript 极客时间 分布式架构 Jenkins JVM 多线程 Java基础 CentOS安装 编译OpenJDK 持续集成 杂谈

从死循环BUG来聊聊ConcurrentHashMap的执行内幕(上)

发表于 2021-05-29 | 分类于 多线程 | 1 | 阅读次数 1236

JDK版本 8u251

前几日遇到的一个问题:一个SpringBoot项目凭空“起不来”了,日志刚刚输出完一个加载数据库驱动的配置,检查数据库用DataGrip能正常连接。很纳闷,偏偏这个时候就自己的环境起不起来其他人都可以正常启动(后来定位到和每个机器的环境变量参数数量有关)。

那咱们只能撸堆栈看看线程在干什么了,先定位到主线程在RUNNABLE,说明没有死锁,并且不是WAITING,说明没有等待数据库连接等。 image.png

咦,一看是ConcurrentHashMap的transfer方法,以前听说过HashMap多线程死循环,但一想咱们这个是单线程而且是ConcurrentHashMap八竿子打不着边。只能一行行撸源码研究技术了,看看咱们到底哪里用错了,顺便深入学习下Lock Free的内幕。

问题复现


下面就用几行代码简单复现当时的项目场景:

public class SpringContext {
    // 此处方便模拟扩容 设置容量参数为32
    private static final int INITIAL_CAPACITY = 32;
    private static final String PRO_ENV = "pro";
    private final ConcurrentMap<Object, Object> store = new ConcurrentHashMap<>(INITIAL_CAPACITY);
    // 模拟 Environment 环境配置本地缓存
    private final ConcurrentMapCache cache = new ConcurrentMapCache("cache", store, true);

    // 扩容时容量阈值
    private static final int RESIZE_CAPACITY = INITIAL_CAPACITY + (INITIAL_CAPACITY >>> 1);

    // 启动时 假设环境参数有这些 RESIZE_CAPACITY - 1 个
    {
        for (int i = 0; i < RESIZE_CAPACITY - 1; i++) {
            cache.put("cacheConfig " + i, "cacheValue");
        }
    }

    public static void main(String[] args) {
        new SpringContext().start();
    }

    /**
     * 容器启动方法
     */
    public void start() {
        System.out.println("============ start ============");

        String mainConfig = getConfigFromEnv("mainConfig");

        System.out.println("start up at mainConfig: " + mainConfig);

        System.out.println("============= end =============");
    }

    /**
     * 优先从缓存取 取不到从其他配置服务获取
     */
    private String getConfigFromEnv(String configKey) {
        return cache.get(configKey, () -> getConfigByOtherConfig(configKey));
    }

    /**
     * 模拟从其他配置服务拉取配置场景
     */
    private String getConfigByOtherConfig(String configKey) {
        String active = System.getenv(configKey);

        /**
         * 如果是生产 对加密的部分进行解密
         */
        if (PRO_ENV.equals(getSpringActive())) {
            System.out.println("decrypt config ...");
            return active;
        }

        return active;
    }

    /**
     * 优先从缓存取 取不到给dev
     */
    private String getSpringActive() {
        return cache.get("active", () -> "dev");
    }
}

用几句话来简单解释下这段代码在干什么:

  1. 声明了2个成员变量:
    1. 本地缓存store,数据结构类型为ConcurrentHashMap,初始容量为32
    2. 本地缓存ConcurrentMapCache,封装了上面的ConcurrentHashMap,设置了一个缓存名字cache,基本没做啥事。
  2. 给SpringContext增加了一个构造代码块,这块代码在实例化对象时会运行,这里模拟容器初始化启动给本地缓存 (扩容阈值-1) 个配置。(后面会讲为什么要 RESIZE_CAPACITY -1 个)
  3. 程序入口,启动容器,调用start方法:
    1. 记录启动日志
    2. 从环境中获取核心配置mainConfig
    3. 使用配置,然后容器启动完成

常规操作,一路看下来,似乎没啥问题,逻辑也说得通,取不到本地缓存的配置从配置服务查,取到放进缓存。接下来我们启动看看? image.png

不出意外的卡住了...堆栈显示就是卡在ConcurrentMapCache缓存那里了,业务逻辑完全没问题,难不成JDK也有BUG?(想快速查看原因,滑到最下直接查看结论即可。)

image.png

源码解析


ConcurrentHashMap数据结构


image.png

图片来源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/

哈希表(Hash Table)又叫散列表,是一个方便快速查询的数据结构,上面是一个存储Node节点的数组,Node节点存储的可以是链表也可以是红黑树。查询时需要根据每个键的哈希值,对数组的长度取余,定位到是哪一个Node节点位置(也叫桶节点),如果Node里面存在与查询的键相同的键,那么就是要寻找的值,否则为找不到。 这样的好处是,理想情况下查询是一个O(n)的复杂度,因为哈希冲突(相同的桶节点),我们需要引入Node下面的节点,这些节点存在就是为了在哈希冲突时仍有不错的查询效率,当然,链表在过长的情况下,效率较低,所以在HashMap的Node节点元素数量大于等于8个时会转化为红黑树,但是维护红黑树的自平衡也需要成本,所以在扩容后判断桶节点数小于等于6时又会再次退化为链表。

HashMap虽然高效,但是很遗憾他不是线程安全的数据结构,所以面对高并发场景Doug Lea大神为我们提供了ConcurrentHashMap这个工具类,一个并发版本的HashMap。数据结构和HashMap是一致的,只是在各种方法上支持了并发调用。


细心的你一定发现了,本地缓存初始化时,指定了32个初始容量,从HashMap的扩容规则可知加载到0.75个容量时会触发扩容,ConcurrentMapCache也是套用这个规则,我们在构造代码块设置了 32 + 16 - 1 = 47个元素,嗯?没搞错...32容量的扩容阈值48?

那咱们来打开构造方法看看:

ConcurrentHashMap构造方法


    public ConcurrentHashMap(int initialCapacity) {
	// 如果小于0 抛出异常
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
	// MAXIMUM_CAPACITY 就是Integer的最大正整数,如果过大 设置 MAXIMUM_CAPACITY
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
	/* 这里有点意思,tableSizeFor 求大于initialCapacity的最小2次幂,
	那么initialCapacity* 1.5+1后的最小2次幂(+1针对乘1.5会有小数点,
	所以+1把小数点给抹平),这和HashMap中的实现有所不同(HashMap中是
	tableSizeFor(initialCapacity))
	*/
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        // 这个值是用来表示ConcurrentHashMap数组的状态的,几种情况后面逐渐讲解
	// 这里的值为数组的容量
	this.sizeCtl = cap;
    }

从这个构造函数设置cap值处发现端倪:构造函数想表达的initialCapacity参数,实际上不是容量,而是扩容阈值,HashMap的加载因子是0.75,实际上容量就是 扩容阈值 * 1.33333,和1.5也差不多,毕竟0.75是除法,而1.5是位运算,效率上也高好多。回到刚刚的问题,传入来是32,那么32*1.5+1=49,然后对49求最小2次幂不就64了?想想假如是22,本身不需要扩容,但经过计算22 * 1.5 + 1 = 34 ,最终会使用64,相当于扩容了一倍,而 22 / 0.75 + 1 = 30, 实际上还是应该取32,严格点,这就是一个BUG,并且有人已经提过了 JDK-8202422

image.png

从上面可以抓取到几个信息:这个bug从Java 8开始就已经有了,已经在Java 11.0.1中修复了,下面是ConcurrentHashMap的新构造方法

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

Java11中这里被修复成了(long)initialCapacity / loadFactor) 根据加载因子来做除法,最后咱们来分析下为什么用1.5倍呢?实际上,我们再细看下ConcurrentHashMap的构造方法代码,除了全参构造中使用了计算以外,连变量都没有保存,并且在扩容时,扩容阈值的计算使用了sizeCtl = (n << 1) - (n >>> 1) (在后面会看到),相当于0.75倍新容量,也就是说,Doug Lea老爷子不希望我们自己去设置这个加载因子的值,淡化加载因子的概念,就算是我们需要设置,也只建议设置0.75了。

接着我们来看下sizeCtl这个参数,这个参数主要标识着哈希表的状态,从构造方法看出,设置了容量的大小,这个参数设计的很精妙,在后面我们再统一解析。

可以看出ConcurrentHashMap是懒加载(lazy-load 形式)的,因为在new ConcurrentHashMap时只做了一些容量大小的赋值等,很大程度上减小了初始化的开销。

咱们回到主题,现在传入的是32,咱们实际上容量是64,所以扩容的阈值就是48了,理论上我们在初始化时设置了47个,如果再加载一个元素,那么就会触发扩容,咱们带着疑问顺道来学习下put方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

put方法将调用转发给putVal方法:

新增方法-putVal


final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
	// 计算Hash值 实际上使用的Key的hashCode方法
        int hash = spread(key.hashCode());
	// binCount表示添加前节点对应的桶上面的节点数
        int binCount = 0;
	// 轮训 如果第一次是初始化,第二次会开始添加节点值 第N次就是多线程竞争了,
	// 在后面添加同一个桶的方法时 会不断尝试
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
		// 这里判断 如果tab为空 实际上就是懒加载场景,延迟初始化 然后下次循环 此处不为空
                tab = initTable();
		// tabAt 方法 是Unsafe类中通过volatile方式获得指定地址所对应的值,
		// 这里的偏移量是 通过 ( 数组长度 - 1 ) & hash值快速计算得到,这里的
		// 原理就:对一个2的n次幂的值 快速求模 = (n - 1)与hash值,这样可以快
		// 速定位到桶的位置,和HashMap一致 
		// 如果为null 说明还没有初始化过,则直接创建一个新的节点
		// 并且将该桶的第一个节点赋值给f
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
		// 使用cas的方式为这个位置赋值,如果为null说明还没有其他线程更新过
		// 如果不为null 就会进入下次循环判断了
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
	    // 如果桶第一个节点Hash值为ForwardingNode节点,ForwardingNode节点的Hash
	    // 值为MOVE -1,意思是当前正在迁移,如果是这样的情况则这个线程也去帮助扩容
	    // 扩容完成后会更新tab,跳出本次循环下次插入
            else if ((fh = f.hash) == MOVED)
		// 这里帮助扩容的方法放到后面解析
                tab = helpTransfer(tab, f);
            else {
                // 如果hash冲突,则尝试遍历数插入
		// 暂时折叠起来 后面详细看
        }
	// 流程到这里 就代表put成功,map的数量加1
        // 在这里同样也有协助扩容的逻辑
        addCount(1L, binCount);
        return null;
    }

整个putVal的逻辑还是比较清晰的,我们再画张图理一下

image

小结下putVal方法的流程:

  1. 验证key和value都不能为空,如果为空抛出异常
  2. 使用spread方法计算哈希值
  3. 如果是第一次put元素,那么会进行初始化(构造函数懒加载延迟初始化)
  4. 如果计算出当前桶还没有初始化过,会直接用当前key和value创建新的头节点
  5. 如果判断出当前桶的Hash值为MOVE,那么说明已经有其他线程已经在扩容,那么此线程也会去尝试帮助扩容
  6. 上面的条件都不满足时,说明是Hash冲突,需要在当前桶位置插入新的节点(未完,在put节点方法继续)

当然还牵扯出很多知识点:

哈希方法spread


    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

这里拿到hash值后,(因为int是32位)这里就是将hash值得高16位和低16位进行异或计算,细细分析下为什么要异或计算呢?因为hash值在后面会与哈希表数组的长度的-1进行取与计算,这样做的好处是可以快速计算 hash值 % 数组长度,毕竟位运算的效率是要远高于取模这种除法计算的,但要求就是长度必须要是2的幂;拿出高16位进行异或运算,主要是降低在长度小于16次幂(65536长度,大部分情况不会在一个哈希表中存储6w多个元素吧)的情况下hash冲突,因为高16位在这个长度下取模运算时是用不到的,取一个异或“扰动函数”正好能降低哈希冲突的概率

举个极端点例子,假设现在哈希表的长度是16,那么他的某个Key的哈希为65536,那么他的二进制分布如下,而我们的长度为16,那么-1就是15,与15取与,其实就是只看后四位有几个1,而65536和很多16的倍数一样,后四位都是0这样冲突就很多了,取一个异或“扰动函数”,能有效将高16位的哈希值“映射”到低16位上,降低哈希冲突。

image.png

这里再来看下最后一步 & HASH_BITS,HASH_BITS的值为Integer的最大值,那么我们可以知道,如果是正数,与这个数取与是完全没有影响的,因为除以最大的正整数的模肯定还是自己了,但是如果是负数,那么经过这样取与操作之后就变成了正数,Integer的MAX值除了最高为0以外都是1,那么与负数最高为1相与,只剩下正数部分了,换句话说,这个& HASH_BITS 是在保证Hash值不能为负数。因为在ConcurrentHashMap中,负数有特殊的含义:

  • MOVED = -1; // hash for forwarding nodes 代表正在迁移
  • TREEBIN = -2; // hash for roots of trees 代表当前节点是红黑树节点
  • RESERVED = -3; // hash for transient reservations 代表当前节点是预留节点,用于临时存储的计算节点

搞清楚这里之后,那么我们在下面判断节点类型时就可以用hash>0来确定是普通还是其他节点,是负数说明可能是红黑树或者预留节点或迁移节点了。

桶数组的初始化


    // 这个方法只作为初始化用
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 这里将tab设置给tab,方法内部操作是tab
        // 可以这样直接操作的变量,必须要是volatile修饰的,以确保内存可见性
        while ((tab = table) == null || tab.length == 0) {
            // 如果sc 小于0 说明其他线程已经在扩容了
            // sizeCtl肯定也必须是volatile修饰的
            if ((sc = sizeCtl) < 0) // A
                // yield 释放资源调度 给其他线程机会 空转
                Thread.yield(); // lost initialization race; just spin
                // 这里利用CAS的方式将sizeCtl的值更新成-1 标识当前线程需要去初始化了
                // 如果竞争失败,就会在上面if中让出资源
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 这里我们来思考下 为什么要double check 经过CAS之后肯定只有一个线程
                    // 假设第一个线程刚刚执行到代码A处,并且还没有赋值完成,切换到第二个线程,正在这里初始化,
                    // 判断不为空,创建新的Node,还没有来得及执行finally代码,切换到第一个线程
                    // 那么他也会cas成功,如果这里不再次check 就会创建两次,虽然容量是一致的,实际上也浪费的性能
                    // 所以这里二次验证,就是为了避免高并发场景下,哈希表被重复实例化的情况
                    if ((tab = table) == null || tab.length == 0) {
                        // 无参构造时 就是默认值,此时使用默认大小 16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 初始化完成后,这个sc就是下次需要扩容阈值了,0.75倍的容量,并且不可更改
                        // 所以从这里也可以看出,ConcurrentHashMap没有加载因子的概念,无论是否设置
                        // 都会使用0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    /**
                     * 放在finally中的就代表是一定需要执行的,如果在创建哈希表数组时发生OOM异常等异常,那么
                     * 此时sizeCtl就是-1 一直空转了,需要在finally中释放状态,代表初始化失败,给其他线程
                     * 初始化的机会
                     */
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

哈希表实际上就是存储不同类型Node的数组,这里的Node 类型可以是链表,也可以是红黑树,都是解决在Hash冲突的情况下元素查询问题

小结下initTable方法:

  1. 判断是否有其他线程初始化,如果有则当前线程会放弃竞争资源,空转
  2. 如果是第一个线程进来,会使用cas的方式将sizeCtl的值修改为-1(sizeCtl状态表示下面统一总结),cas成功后会进行初始化,长度为sizeCtl之前初始化时的值,接着会设置扩容阈值为table长度的0.75倍到sizeCtl上。
  3. 初始化完成,返回table

初始化桶数组的逻辑很像double check的单例模式,只是锁的验证方式换成了CAS自旋锁,这里我也写了基于CAS实现的Lock Free单例模式。代码放在了github上。

为桶内添加第一个元素


这里先来看一下putVal的第四个逻辑判断,当hash冲突时给桶内加元素,和HashMap逻辑类似,不同的是在桶上有锁,这里也诠释了JDK8中的 ConcurrentHashMap 中锁的粒度是在桶级别

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 计算Hash值 实际上使用的Key的hashCode方法
	int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();  // 折叠
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 折叠
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f); // 协助扩容 折叠 
            else {
                // 这里说明 桶内已经有至少一个节点
                V oldVal = null;
                // 这里给桶的第一个节点加了锁,对链表或者红黑树是没有使用lock free编程的
                synchronized (f) {
                    /**
                     * 这里又是一个double check 因为synchronized是阻塞的,如果在synchronized期间,
                     * 其他线程插入了该桶的节点,触发了扩容,此时当前的table不再是原来的oldTable了,
                     * 当本线程切换回来使用newTable计算hash槽也会不一致;或者是扩容时将当前桶置为ForwardingNode节点,
                     * (ForwardingNode是一个标记节点,hash值为MOVE的-1),此时也已经不是之前的桶节点了,会出现
                     * tabAt(tab, i) != f 不一致的情况
                     */
                    if (tabAt(tab, i) == f) {
                        /**
                         * 如果大于0说明是一个普通节点,那么直接使用链表的插入方法
                         */
                        if (fh >= 0) {
                            // 这个用来记录当前桶的节点数量
                            binCount = 1;
                            // 循环链表上的每个节点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 如果onlyIfAbsent 是false 用传入的Value覆盖旧的值 跳出循环
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                /**
                                 * 如果e的next节点为null 说明是最后一个节点 尾插法 插入最后
                                 *  则将当前节点的next绑定为新节点,新节点的kv参数为传入的key和value
                                 *  并且next为空
                                 * e = e.next 如果不为null,那么 e 就会继续循环下去 知道找到最后一个节点
                                 */
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            // 如果是红黑树 则为2,后面讲为何是2
                            binCount = 2;
                            // 红黑树的逻辑不在此文中讨论
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // binCount 在链表插入初始设置为1,最后为插入前的链表的长度 红黑树插入设置为2
                if (binCount != 0) {
                    // 如果链表的数量已经达到了转成红黑树阈值(默认为8)的时候,则转换为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果是更新了旧值,就直接返回,否则跳出循环进入addCount方法
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
	// 流程到这里 就代表put成功,map的数量加1
        // 在这里同样也有协助扩容的逻辑
        addCount(1L, binCount);
        return null;
    }

接上putVal方法小结:

  1. 当hash冲突时,会给头节点加synchronized锁,阻塞其他put线程。插入时采用尾插法,插入后会记录当前桶的节点数binCount,如果binCount大于等于链表转换红黑树的阈值(默认为8)会转换为红黑树节点。
  2. 最后调用addCount方法增加计数器的值

在ConcurrentMapCache的get方法中实际上使用的computeIfAbsent方法,下面我们来看下computeIfAbsent方法

方法computeIfAbsent


computeIfAbsent方法和putVal方法很类似,这里只看不同的部分

    public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
        // 这里 mappingFunction 和key都不能为null
        if (key == null || mappingFunction == null)
            throw new NullPointerException();
        int h = spread(key.hashCode());
        V val = null;
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
                /**
                 * 在上面哈希方法文中可以知道 Hash值除了正数 负数有三种类型,其中RESERVED -3 代表预留节点
                 *  ReservationNode 在初始化的时候会设置hash值为RESERVED
                 * 这里假定mappingFunction 是一个耗时操作,只会允许一个线程计算,这里在计算之前标记此位置是一个预留节点
                 *  所以ReservationNode只是一个占位符,在操作冲突时会阻塞其他线程插入节点
                 *  在加锁之后 会使用casTabAt 的方式操作 设置此处为预留节点 如果失败就说明有其他线程竞争成功去计算了,退出循环重试
                 */
                Node<K,V> r = new ReservationNode<K,V>();
                synchronized (r) {
                    // 使用cas的方式设置预留节点 synchronized 代码块一般会带一个double check
                    if (casTabAt(tab, i, null, r)) {
                        // 这里binCount只是一个标志,如果cas成功 才为1 如果失败需要在外层循环重试
                        binCount = 1;
                        Node<K,V> node = null;
                        try {
                            /**
                             * 调用 mappingFunction.apply 计算value 如果不为空则会创建新的节点并赋值给node
                             *  最后在finally中更新 tab中i的值为node 如果计算的值为null那么不会更新总数
                             */
                            if ((val = mappingFunction.apply(key)) != null)
                                node = new Node<K,V>(h, key, val, null);
                        } finally {
                            setTabAt(tab, i, node);
                        }
                    }
                }
                if (binCount != 0)
                    break;
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                boolean added = false;
                /**
                 * 假如 在计算节点ReservationNode 此时发生其他线程进来操作
                 *  那么 这里的锁的和上面ReservationNode节点是同一把锁
                 */
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek; V ev;
                                if (e.hash == h &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    val = e.val;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    /**
                                     * 与putVal方法类似 找到最后一个节点 并且计算这个值 如果不为空则将next设置为新节点
                                     *  added会作为是否成功新建节点的判断条件
                                     */
                                    if ((val = mappingFunction.apply(key)) != null) {
                                        added = true;
                                        pred.next = new Node<K,V>(h, key, val, null);
                                    }
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            binCount = 2;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                    (p = r.findTreeNode(h, key, null)) != null)
                                val = p.val;
                            /**
                             * 与putVal方法类似 找到最后一个节点 并且计算这个值 如果不为空则将next设置为新节点
                             *  added会作为是否成功新建节点的判断条件
                             */
                            else if ((val = mappingFunction.apply(key)) != null) {
                                added = true;
                                t.putTreeVal(h, key, val);
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果未新增 则说明已经存在相同key的值 直接返回即可
                    if (!added)
                        return val;
                    break;
                }
            }
        }
        /**
         *  这个val会作为新增的判断条件 mappingFunction会将计算的值赋值给val
         *    val = mappingFunction.apply(key)
         *   只会在不为空时新增计数器数量
         */
        if (val != null)
            addCount(1L, binCount);
        return val;
    }

分析完putVal方法再来看computeIfAbsent方法就很容易理解了,这里变更的点就是在判断需要操作的节点时新增了ReservationNode节点,并且操作时会加synchronized关键字,阻塞同桶其他线程操作,计算完成后,会在finally中设置回新节点,如果mappingFunction计算的值不为空,还会新增计数器的值。

多线程扩容

扩容前置判断


先打个预防针,ConcurrentHashMap的扩容方法是比较复杂的,也是极富有学习价值的。顺便套一句话:面试并不一定会问到,但是仍然值得好好研究。虽然你我可能没有太多机会实现这样的逻辑,但是了解大师的设计思路,对于自己的能力来说也是一种提升。

addCount方法在putVal成功后执行,并且x为新增元素个数,默认情况只会是1了,check方法是当前桶的元素数量,大于0才会走判断扩容逻辑。

    private final void addCount(long x, int check) {
	// 对于计算总数的方法我们先折叠起来
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            // 上面代码 折叠 等下看  
            s = sumCount();
        }
        // check不是负数, 就帮助扩容, 在clear 方法中会设置为 -1
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            /**
             * s 记录了当前map中所有节点的数量 ,这里的计算逻辑 比较直接,
             * 如果table不为空,且容量小于最大值,当前节点的数量大于扩容的阈值时进行扩容
             */
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                /**
                 * 这个函数的返回值记录了当前数组长度的标志位,在下面有讲
                 *  同一个长度的数组,计算的rs值一定是相同的
                 */
                int rs = resizeStamp(n);
                /**
                 * 如果sc小于0 说明已经有其他线程开始扩容了, 那么此处的逻辑可以理解为helpTransfer 的判断条件了
                 *  判断当前是否需要线程协助扩容
                 * 正常情况下 sc的值为大于扩容的阈值 肯定是大于0的
                 */
                if (sc < 0) {
                    /**
                     * 这里有五个与操作 有一个则退出协助扩容逻辑
                     *  (sc >>> RESIZE_STAMP_SHIFT) != rs  rs是根据数组长度算出来的,sc在下面一个if就是第一个
                     *  线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明
                     *  扩容时数组的长度变了,其他线程触发了第二次扩容
                     *  sc == rs + 1 这里算是一个BUG,实际上,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + 1
                     *  因为sc的值是rs左移16位 出来的,低16位表示线程扩容的状态,在下面那个if 就是就是第一个
                     *  线程进来的时候低16位是2, +1 = + 2 - 1, 这里的减1来自最后一个线程检查,下面会说
                     *  sc == rs + MAX_RESIZERS 这里和上面逻辑类似,判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,
                     *  实际上也是一个BUG,应该是  sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                     *  nextTable == null 说明已经扩容完成 这里不需要在协助扩容了
                     *  transferIndex 说明bound区间已经都分配完了,那么也不需要扩容了 (下面有说)
                     */
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    /**
                     * 走到这里 说明需要线程协助扩容的,使用cas的方式 将sizeCtl+1,然后进入
                     *  transfer方法帮忙做迁移
                     *  这里也可以了解到,sizeCtl 的高16位表示的是哈希表数组的标志位,唯一长度的哈希表只有一个标志
                     *   低16位 - 1 后表示正在扩容的线程数
                     */
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                /**
                 * 如果是第一个扩容线程进来,会来到这里
                 *  此处将sizeCtl的值设置为rs的左移16位 + 2,从resizeStamp函数 可以知道 返回值只是长度的一个标志位
                 *  并且只在低16位有值,第15位位1,这样操作后仍然是一个负值,符合上面sc < 0的协助扩容判断
                 *
                 *  +2  低 16位 为二进制的 10
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);

                // 重新计算下总数,下次循环判断是否需要扩容
                s = sumCount();
            }
        }
    }

其中resizeStamp方法如下

    static final int resizeStamp(int n) {
        /**
         * Integer.numberOfLeadingZeros(n) 返回的是该数二进制表示情况下,最高位前面1的个数,比如
         * 128表示为 0000 0000 0000 0000 0000 0000 1000 0000 ,返回24,即最高位1前面有24个0
         * Integer.numberOfLeadingZeros(n) 返回的最大值是32,最小值为0
         * 用二进制表示就是
         *  32   0000 0000 0000 0000 0000 0000 0010 0000
         *  0    0000 0000 0000 0000 0000 0000 0000 0000
         * RESIZE_STAMP_BITS 是 16,1 左移 15 位就是
         * 32768 0000 0000 0000 0000 1000 0000 0000 0000
         * 取或运算 就是 任何一个1 为1 都为1
         * 那么 resizeStamp 的值区间为 32768 ~ 32800
         * 32800 0000 0000 0000 0000 1000 0000 0010 0000
         *
         * 可以看出 返回值就是高16位全是0,低16位包含了哈希表数组的长度信息
         */
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

这里我们画张图简单了解下这个值的运用,理解这个函数对理解哈希表当前的状态sizeCtl也是有帮助的

image.png

我们理解了resizeStamp方法,其实就很好理解addCount方法的扩容部分了,下面我们小结一下:

  1. 扩容的部分首先会判断sumCount()即当前哈希表元素数是否大于sizeCtl(扩容阈值),如果是true代表需要扩容
  2. resizeStamp会根据当前数组长度计算一个16位的唯一标志位,如果是第一个扩容线程,会将sizeCtl的高16位设置成它,低16位设置成 线程数 + 1(因为 第一个线程线程为2)
  3. 如果根据sizeCtl判断(小于0则代表已经有线程正在扩容),是第二个或者N个扩容线程,会将sizeCtl的低16位+1,并尝试协助扩容

协助扩容helpTransfer


在上面put方法中在判断是MOVE节点时,有一个帮助扩容方法helpTransfer。理解了addCount方法再来理解helpTransfer就很容易了,咱们来看下这里面的实现:

    // 这个方法 主要是用来判断是否需要协助扩容的
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 如果哈希表不为空,且当前节点为ForwardingNode节点说明当前桶正在迁移中
        if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            // 这个函数在下面有解释,实际上就是把长度信息封装到低16位了
            int rs = resizeStamp(tab.length);
            /**
             * 既然是协助,那么这里的代码,肯定是在判断其他线程正在扩容了
             *  nextTab == nextTable 新数组不能为空 为空代表结束了
             *  sc = sizeCtl 小于0 说明扩容中
             */
            while (nextTab == nextTable && table == tab &&
                    (sc = sizeCtl) < 0) {
                /**
                 * 这里的判断和上面那个是否需要协助扩容逻辑一致
                 *  (sc >>> RESIZE_STAMP_SHIFT) != rs  rs是根据数组长度算出来的,sc在下面一个if就是第一个
                 *  线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明
                 *  扩容时数组的长度变了,其他线程触发了第二次扩容
                 *  sc == rs + 1 这里算是一个BUG,实际上,应该是 sc == (rs << RESIZE_STAMP_SHIFT) + 1
                 *  因为sc的值是rs左移16位 出来的,低16位表示线程扩容的状态,在下面那个if 就是就是第一个
                 *  线程进来的时候低16位是2, +1 = + 2 - 1, 这里的减1来自最后一个线程检查,下面会说
                 *  sc == rs + MAX_RESIZERS 这里和上面逻辑类似,判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,
                 *  实际上也是一个BUG,应该是  sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                 *  transferIndex 说明bound区间已经都分配完了,那么也不需要扩容了 (下面有说)
                 */
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                /**
                 * 走到这里 说明需要线程协助扩容的,使用cas的方式 将sizeCtl+1,然后进入
                 *  transfer方法帮忙做迁移
                 *  这里也可以了解到,sizeCtl 的高16位表示的是哈希表数组的标志位,唯一长度的哈希表只有一个标志
                 *   低16位 - 1 后表示正在扩容的线程数
                 */
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            // 扩容成功 返回新的tab 哈希表
            return nextTab;
        }
        return table;
    }

协助扩容的逻辑和addCount后半部分类似,这里小节下协助扩容的逻辑:

  1. 因为是协助,首先需要判断是否已经有其他线程在扩容:
    1. sizeCtl必须要小于0,高16位存储resizeStamp长度的标志位,低16位存储扩容线程数 + 1(第一个为2)
    2. 判断是否是正在扩容的标志是nextTable,即nextTable只存在于扩容中 其他时间都为null
  2. 判断是否需要当前线程去帮助扩容:
    1. (sc >>> RESIZE_STAMP_SHIFT) != rs rs是根据数组长度算出来的,sc在下面一个if就是第一个线程进来的时候高16位是rs的值,那么这里理论上还原也是相同的,如果不相同那么只能说明扩容时数组的长度变了,其他线程触发了第二次扩容
    2. sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS (原来的判断是sc == rs + MAX_RESIZERS)判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量
    3. transferIndex 说明bound区间(扩容部分讲)已经都分配完了,那么也不需要扩容了
  3. 如果需要则把sizeCtl的低16位+1,标志正在扩容线程数,调用transfer方法进行协助扩容。

这里说说上面的BUG:

  1. sc == rs + 1判断 是否是最后一个线程检查,实际上这里应该永远不可能等于的,因为之前我们也知道sc存放分为两部分:高16位为rs的值,低16位为扩容线程数+1,那么这里就应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1而不是直接判断,如果直接判断rs是永远不会大于65536的(1 <<< 16)因为他只有低16位有值,那么sc都是负数了,怎么会相等呢?
  2. sc == rs + MAX_RESIZERS 判断是否还需要线程扩容,MAX_RESIZERS是最大扩容的线程数量,和第一个同理,rs如果没有左移16位最大值肯定小于65536,MAX_RESIZERS的值为65535,它俩加起来就是131,071,这个和负数溢出还差十万八千里呢... 所以,这两个检查相当于没有,永远不可能成立,当然,有理有据在OpenJDK的bug提交记录上可以看到如下的Bug记录:JDK-8214427

image.png

由上可以看到,这个bug在Java 12及之后的版本修复了,所以下面来看一下这块改成了什么。JDK12对比

image.png

从上面可以看出,这两个判断条件已经修复了。在判断前int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT 对rs做了左移16位操作,此时判断sc == rs + MAX_RESIZERS 和 sc == rs + 1才是有意义的。顺便看到删除了JDK8 中(sc >>> RESIZE_STAMP_SHIFT) != rs这个判断,这个是判断此时是否是原哈希表的,如果协助扩容时发生了二次扩容,这里resizeStamp会生成不同的标志位,网上也搜索了相关资料,没有找到合理的解释,这里有知道缘由的可以在下方留言告诉我。


这里说到协助,扯点题外话:我们不妨想想,我们一直在寻找限制多线程的方法:加锁阻塞、自旋,因为扩容时需要对内部数据迁移,是一个耗时操作,但如果此时仍然选择阻塞或者自旋其实都是性能上的损失,大师的思想就是:其他线程不会被阻塞或者自旋,而是去帮助扩容,拥抱多线程,而不是拒绝它,最大程度的发挥多核CPU的优势。


什么时候扩容

了解扩容方法之前,我们先看下扩容的前置条件,什么时候扩容?

  1. 当put元素达到扩容阈值时,会在addCount中判断,调用transfer方法扩容
  2. 触发tryPresize操作,有两种情况下会触发tryPresize方法:
    1. putAll方法在put 元素之前会调用tryPresize传入Map的长度,将数组的长度扩容至大于传入map长度的最小2次幂
    2. 在转换某个桶节点为红黑树时,会判断当前数组的长度是否小于MIN_TREEIFY_CAPACITY(默认是64),如果小于这个长度,那么会尝试用tryPresize方法扩大数组的长度为当前的2倍,并触发transfer方法,重新调整节点的位置

尝试调整方法tryPresize


    private final void tryPresize(int size) {
        /**
         * 判断传入的size 是否到最大长度,如果是则取最大值
         *  否则按照 扩容阈值 的方式计算它的最小2次幂
         *  即和构造函数一样 n >= 1.5 * size + 1
         */
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // 注意这里是一个死循环
        while ((sc = sizeCtl) >= 0) {
            /**
             * 下面的判断和初始化table 逻辑类似 初始化,将sc更新为sizeCtl -1 表示正在初始化
             *  这里有一个不起眼的判断 n = (sc > c) ? sc : c 实际上就是取数组长度的最大值
             *   如果putAll方法传入的哈希表过小 这里是不能按照传入的来初始化table的
             */
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // sc就是扩容的阈值,sizeCtl在不操作是存储的下次扩容的阈值
                        sizeCtl = sc;
                    }
                }
            }
            /**
             * 如果c小于sc 说明已经初始化且大小合适 不需要扩容 直接结束了
             *  比如 传入的size是 32 经过tableSizeFor方法后是64
             *  但实际上此时的sc可能其他线程操作了扩容 已经是96了 那么此时就不需要再继续扩容了
             */
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            /**
             * 排除上面两个判断,这里只可能是sc < c了 代表已经初始化过,但是需要扩容的情况
             */
            else if (tab == table) {
                /**
                 * 这里的逻辑和helpTransfer方法类似 判断当前的 sc 是否小于0 如果小于0说明其他线程正在扩容 此线程可以尝试去帮忙
                 *  如果sc 大于0 此线程 尝试 抢占第一个扩容线程 如果成功后 会将sizeCtl 设置为(rs << RESIZE_STAMP_SHIFT) + 2
                 */
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    //  这里的判断条件和helpTransfer一致 就不再赘述
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // 如果可以帮助扩容,那么将sizeCtl的低16位+1 即扩容线程数 +1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

这里小结一下扩容的逻辑

  1. 根据传入的size计算最小2次幂,如果大于最大值的一半,则设置最大值MAXIMUM_CAPACITY(默认为1 << 30)
  2. 判断是否是初始化,如果是初始化,那么使用传入的size的最小2次幂和当前构造函数设置的扩容阈值比较,去一个max作为数组的初始长度
  3. 判断是否需要扩容,即扩容阈值小于当前传入的size的最小2次幂,这里的判断逻辑和helpTransfer中类似,判断当前的 sc是否小于0 如果小于0说明其他线程正在扩容 此线程可以尝试去帮忙,如果sc大于0 此线程 尝试 抢占第一个扩容线程 如果成功后 会将sizeCtl 设置为(rs << RESIZE_STAMP_SHIFT) + 2

扩容方法transfer


前面铺垫还是挺多的,这里来详细看下扩容的核心逻辑。

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        /**
         * 这里在计算每个线程需要执行的桶个数,下面简称为bound区间
         *  NCPU 是cpu的虚拟核心数 如果8c16t那么就是16
         *
         *  如果 NCPU > 1 则每个bound区间的个数为  (n >>> 3) / NCPU  否则为n 就是不拆分
         *   如果算出的bound区间太小 那么就使用 MIN_TRANSFER_STRIDE 默认是16
         */
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 这个nextTab是传入的 在外判断为第一个线程扩容时 为null
        if (nextTab == null) {            // initiating
            try {
                // 扩容的数组为2倍长度 设置给nextTab
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // 到这里说明异常了 除了受检异常还有OOM异常,此时放弃扩容,并将sizeCtl设置为最大值
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            // nextTable是一个 volatile修饰的 节点 ,代表扩容后的哈希表
            // 小Tip  这里为什么不需要cas的方式为nextTable赋值? nextTab 是外面传入的 在上个方法会判断扩容的状态
            nextTable = nextTab;
            // transferIndex 设置的值为原数组长度
            transferIndex = n;
        }
        // nextn 存储了新数组的长度
        int nextn = nextTab.length;
        /**
         * 这里第一次看到ForwardingNode 节点 他的Hash值为MOVED -1
         *  并且存储了新哈希表的引用 只有空桶会用到
         */
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 这里声明了 两个变量  advance当前bound区间迁移是否完成 和 所有bound区间是否迁移完成
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 这里是一个死循环 i表示桶的位置,bound是当前线程区间
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                /**
                 * i是桶的下标,从原数组长度减到-1 表示迁移完成一个桶 就下一个
                 * [bound,transferIndex] 是每个线程的扩容执行区间,transferIndex在第一次进来时会初始化成原数组的长度n
                 *
                 * --i >= bound 如果线程刚刚分配完还没干活,那么i就在上面区间内
                 * finishing 等下说
                 */
                if (--i >= bound || finishing)
                    // 不需要分配
                    advance = false;
                // transferIndex小于0则代表分配完成
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    // 不需要分配
                    advance = false;
                }
                /**
                 * nextIndex - stride 就是这个线程区间的下标 nextIndex是当前未分配的位置,stride就是bound区间的大小
                 * cas 成功 更新transferIndex的值为 nextBound的值
                 * 如果cas失败 那么就说明其他线程进来了 下次重试
                 */
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    // nextBound 就是nextIndex - stride 的值
                    bound = nextBound;
                    // 索引比长度 小1
                    i = nextIndex - 1;
                    // 已经分配好 不需要分配
                    advance = false;
                }
            }
            /**
             * 只有分配完成后advance为false 或者 不需要再次分配时  会来到这里
             *
             *  i 是索引 小于0代表都迁移完成了
             *  n 是长度 i 理论上一直在减 是根本不会大于n的 除非Int数据溢出了,即 Integer.MIN_VALUE - 1 = Integer.MAX_VALUE
             *  上面分配时cas的失败后会不断减小i的值
             *  i + n >= nextn 说实话 没看懂... nextn是新数组的长度 i + n 和  理论上也不会发生
             */
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    // finishing完成时为true  这时更新nextTable为null 将nextTab赋值给table
                    nextTable = null;
                    table = nextTab;
                    // sizeCtl 的值会再次变为正数,扩容的阈值 为当前旧数组的长度 1.5倍 即新数组的0.75倍
                    // 可以看出 这里的负载因子 使用的是0.75 不会使用传入的值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                /**
                 * 难以理解的是这里需要进行一次二次检查 TODO
                 *  从addCount方法可知,扩容线程数存储在sizeCtl的低16位-1上 比如 2个线程 那么sizeCtl的低16位值为3
                 *  每个线程执行完成后 在这里会做一次 减 1
                 *  resizeStamp(n) << 16 sizeCtl高16存储其实就是它
                 *  但只有第一个线程的值 sc 会等于 (rs << RESIZE_STAMP_SHIFT) + 2
                 *  这里就在验证当前线程是否是第一个线程 如果是 则需要做一次二次检查 如果不是 就退出了
                 */
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // finishing只有这里会设置为 true
                    finishing = advance = true;
                    // 将i继续设置为原数组长度,重新跑一边全部的桶 如果没有做完就继续
                    // 想想 为什么要二次check?
                    i = n; // recheck before commit
                }
            }
            /**
             *  如果原数组 桶上这个位置没有值 说明不需要迁移 直接赋值ForwardingNode
             *   赋值这个就是一个标志,如果是MOVE的,那么其他线程在put时就不会操作了
             *   就会进到helpTransfer方法中帮助扩容
             */
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                // 如果检测到这个位置已经有其他线程在迁移了 那么跳出循环重新分配
                advance = true; // already processed
            else {
                /**
                 * 这里和putVal一样 锁住了桶的第一个节点 以防止其他线程操作
                 *  上面分配和扩容实际上是一个循环,如果其他线程分配失败 也会到这里来
                 */
                synchronized (f) {
                    // 锁中double check 是一个好习惯
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // fh 在putVal方法中已经有说 如果大于0说明是普通链表Node节点
                        if (fh >= 0) {
                            // 折叠 链表扩容的方法
                            /**
                             * 将advance设置为true 就会再次进入上面分配方法中 --i >= bound 判断是否分配完成
                             *  如果没有分配完成 会继续下一个桶
                             */
                            advance = true;
                        }
                        // 红黑树节点 折叠
                        else if (f instanceof TreeBin) {
                            // advance  true 与上面链表同理
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

分析完扩容的逻辑,我们再画张图好好理解下:

image.png

小结下扩容的逻辑流程为:

  1. 计算bound区间太小,即每个线程执行扩容的桶个数,最小为16
  2. 构造长度2倍的空数组,作为新的哈希表
  3. 每个线程从最后往前竞争自己的扩容区间,如果竞争成功就将transferIndex的值减bound区间数
  4. 线程在桶执行扩容逻辑之前会给第一个节点加锁,阻塞putVal线程新增数据。链表节点在原数组长度最高位位置的二进制值判断,为0则在当前桶下标i位置,为1在当前下标 i + n 位置处(这里采用头插法);红黑树在拆分后会判断节点数是否小于6,如果是会退化成链表,迁移部分本文不展开研究。当前桶迁移完成后,设置成ForwardingNode节点。
  5. 如果当前桶没有节点,那么就设置ForwardingNode标志正在扩容,让put的线程也进来帮助扩容
  6. 当前线程扩容完成后,如果是第一个线程,那么需要double check 下是否全部扩容完成,如果扩容完成返回新的nextTab,扩容结束

这里面还有链表和红黑树扩容,就是折叠的部分,虽然链表扩容的代码是单线程操作的,但里面的思想也是非常值得拜读的

链表扩容


和HashMap扩容类似,但会先计算是否原数组长度的二进制最高位的值是否全部一致的,如果一致说明不需要拆分直接全部迁移,只用修改头结点的位置即可,算是性能上做了小优化。

    /**
     * fh 与 n 求与 这里的n是原数组的长度 2的幂 除了最高位都是0
     * 那么runBit 这个值是头节点与数组长度 取与 结果就是数组长度二进制最高位位置的值
     * 如果是0 则代表桶的头节点f 扩容后仍然在原位置i 如果为1 则为 i + n 的位置
     * runBit 这个值只可能 == n 或者为0
     */
    int runBit = fh & n;
    // 第一个桶节点
    Node<K,V> lastRun = f;
    /**
     * 这段代码需要倒着看  if (b != runBit) {  runBit = b } 这个判断是将循环到最新的哈希值赋值给runBit
     *  而上面知道 p.hash & 得到的值只可能是 n 或者 0 简单说就是不是高位就是低位
     *  是在检查倒数X个相同节点到头结点 是否同是高位 或者低位
     * 举个栗子 假如链表算出来 依次是
     *  节点 A B C D E F
     *  哈希 n 0 0 n n n
     *  那么 lastRun 就是 D节点,runBit就是n了
     *
     *  这样做有什么好处呢?HashMap中 是全部都与长度n计算了一遍最高位,但是如果全是高位或者低位
     *  其实这里就不需要在进行计算了 直接将头结点赋值给对应的位置即可 ConcurrentHashMap这里就是一个优化检查
     */
    for (Node<K,V> p = f.next; p != null; p = p.next) {
        int b = p.hash & n;
        if (b != runBit) {
            runBit = b;
            lastRun = p;
        }
    }
    // 如果最后一个变动的节点是0 那么 全都放入低位 为下面尾插法做准备
    if (runBit == 0) {
        ln = lastRun;
        hn = null;
    }
    // 如果最后一个变动的节点是1 那么 全都放入高位 为下面尾插法做准备
    else {
        hn = lastRun;
        ln = null;
    }
    /**
     * 这里就开始尾插法的逻辑了 从头开始遍历 结束节点为lastRun
     */
    for (Node<K,V> p = f; p != lastRun; p = p.next) {
        int ph = p.hash; K pk = p.key; V pv = p.val;
        /**
         * 这里和上面计算逻辑一致 如果数组长度二进制最高位的值为1
         *  头插法放入高位链表,如果是0放入低位链表
         *  这里头插法的区别就是插入到前面,next的值为 上一次循环 链表的头
         * 这样也说明了 ConcurrentHashMap的链表迁移是非稳定算法
         * 不是正序也不是逆序
         */
        if ((ph & n) == 0)
            ln = new Node<K,V>(ph, pk, pv, ln);
        else
            hn = new Node<K,V>(ph, pk, pv, hn);
    }
    /**
     * 走过上面的逻辑 我们已经拆分了两个链表 ln低位链表 hn高位链表
     *  将低位链表ln放在新哈希表的i 位置
     *  将高位链表hn放在新哈希表的i + n 位置
     */
    setTabAt(nextTab, i, ln);
    setTabAt(nextTab, i + n, hn);
    /**
     * 旧数组的i位置已经迁移完毕,这时候需要设置此处为ForwardingNode节点
     * 意思是不能在这个位置put元素了,别的线程也不会进来处理
     */
    setTabAt(tab, i, fwd);
    /**
     * 将advance设置为true 就会再次进入上面分配方法中 --i >= bound 判断是否分配完成
     *  如果没有分配完成 会继续下一个桶
     */
    advance = true;

链表扩容部分我们画张图来帮助理解下:

image.png

小结下链表迁移的逻辑如下:

  1. 根据旧数组长度n对每个节点的哈希值取与来计算最后变更的节点lastRun,假如一个都不需要变动,那么头节点就是lastRun
  2. 设置lastRun的节点位置,如果最后变更的是节点哈希值与长度n取与是n,那么说明此哈希值在数组长度二进制最高位的值为1,放入高位链表,否则设置为低位链表
  3. 循环链表的每个不为lastRun的节点,判断哈希值在数组长度二进制最高位的值是否为1,如果为1,则用头插法设置到高位链表,为0会设置到低位链表的头节点,直到操作到最后一个变更的链表节点lastRun,这样也间接说明了ConcurrentHashMap的链表迁移是非稳定算法
  4. 拆分完成后得到高低位两条链表,将低位链表ln放在新哈希表的i 位置,将高位链表hn放在新哈希表的i + n 位置。并且将旧数组的i位置设置为ForwardingNode节点,标记此桶已经迁移完成。

参数小结

状态变量 sizeCtl


在ConcurrentHashMap中有一个比较重要,且出现频率很高的变量sizeCtl,它记录了哈希表操作的状态,我们这里小节一下它出现的含义:

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;
  1. ConcurrentHashMap未初始化时,它存储了哈希表需要创建的大小,默认为0
  2. ConcurrentHashMap初始化时,它为-1,表示正在初始化
  3. ConcurrentHashMap初始化后,它为下次扩容的阈值
  4. ConcurrentHashMap正在扩容时,它是一个负值,它的高16位存储了通过哈希表旧数组计算得到的标志位,低16位存储了 (1 + 正在扩容的线程数)(第一个线程为2,其他为1)

回归问题

分析完成源码后,再次回归问题,看下面四行代码

    /**
     * 容器启动方法
     */
    public void start() {
        System.out.println("============ start ============");

        String mainConfig = getConfigFromEnv("mainConfig");

        System.out.println("start up at mainConfig: " + mainConfig);

        System.out.println("============= end =============");
    }

再看下日志:

image.png

日志显示执行完start方法后就没有然后了,展开getConfigFromEnv方法


    /**
     * 优先从缓存取 取不到从其他配置服务获取
     */
    private String getConfigFromEnv(String configKey) {
        return cache.get(configKey, () -> getConfigByOtherConfig(configKey));
    }

    /**
     * 模拟从其他配置服务拉取配置场景
     */
    private String getConfigByOtherConfig(String configKey) {
        String active = System.getenv(configKey);

        /**
         * 如果是生产 对加密的部分进行解密
         */
        if (PRO_ENV.equals(getSpringActive())) {
            System.out.println("decrypt config ...");
            return active;
        }

        return active;
    }

    /**
     * 优先从缓存取 取不到给dev
     */
    private String getSpringActive() {
        return cache.get("active", () -> "dev");
    }

整理下,获取配置的值getConfigFromEnv方法主要做了这么个事:

  1. 从缓存中获取,使用ConcurrentMapCache提供的get方法,获取不到时使用Callable的valueLoader,跳转到方法getConfigByOtherConfig,表示获取不到从配置服务拉取配置。
  2. 这里getConfigByOtherConfig采用从系统中获取,无论是否拿到,如果是生产环境需要对参数进行解密(常规解密操作都发生在这里),这里显然不是,但是他需要获取Spring的Active,回到方法getSpringActive
  3. 获取active配置仍然是从缓存中,如果没有默认给dev。(幸好不需要再解密了,不然就套娃了)

再来细细看下ConcurrentMapCache的相关方法

public class ConcurrentMapCache {
    private final String name;
    private final boolean allowNullValues;
    private final ConcurrentMap<Object, Object> store;

    @SuppressWarnings("unchecked")
    public <T> T get(Object key, Callable<T> valueLoader) {
        Object storeValue = this.store.computeIfAbsent(key, k -> {
            try {
                T userValue = valueLoader.call();
                if (userValue == null) {
                    if (this.allowNullValues) {
                        return NullValue.INSTANCE;
                    }
                    throw new IllegalArgumentException("Cache '" + getName() + "' is configured to not allow null values but null was provided");
                }
                return userValue;
            } catch (Throwable ex) {
                throw new RuntimeException();
            }
        });
        if (this.allowNullValues && storeValue == NullValue.INSTANCE) {
            return null;
        }
        return (T) storeValue;
    }

    public void put(Object key, @Nullable Object value) {
        if (value == null) {
            if (this.allowNullValues) {
                value = NullValue.INSTANCE;
            } else {
                throw new IllegalArgumentException("Cache '" + getName() + "' is configured to not allow null values but null was provided");
            }
        }
        this.store.put(key, value);
    }

    protected ConcurrentMapCache(String name, ConcurrentMap<Object, Object> store,
                                 boolean allowNullValues) {
        this.name = name;
        this.store = store;
        this.allowNullValues = allowNullValues;
    }

    public final String getName() {
        return this.name;
    }
}

小整理下就两个方法:get和put分别对应新增和查询:

  1. put方法 如果value为null,首先判断allowNullValues是否允许空值对象,如果允许则保存NullValue.INSTANCE作为ConcurrentMap的value保存起来,否则抛出异常。这样的好处是将null值和空缓存分离。
  2. get方法 调用 ConcurrentMap的computeIfAbsent方法,即如果没有则使用传入valueLoader计算得出value,如果为null,则判断是否allowNullValues,支持的话说明可能有NullValue.INSTANCE,此时仍然返回null

从上面看出,NullValue.INSTANCE是有好处的,因为如果为某个键的value为null值,那么保存NullValue.INSTANCE可以有效避免造成缓存穿透的问题,但是,这样也给ConcurrentMap带来了一个空值,这个空值会存在于哈希表中,触发存储、扩容等,当然这个和缓存穿透比起来带来的影响就小很多了。

解答疑问


这样看代码,不太容易串起来理解问题,我们同样,再画张图看看:

image.png

顺着这个图,把线撸顺了,再结合上面对ConcurrentHashMap的深入理解,我们小结一下:

  1. 初始化容器,ConcurrentHashMap中构造了 RESIZE_CAPACITY - 1 个元素,即这时再新增一个就会触发扩容。
  2. 从缓存中获取Config的值,当此时从ConcurrentHashMap中获取不到,ConcurrentHashMap则会调用传入的mappingFunction计算需要的值,但是,这个时候为了防止其他线程并发操作,ConcurrentHashMap设置了一个预留节点,并且使用了synchronized对其加锁,防止哈希冲突时其他线程并发计算。
  3. 在计算时,恰好不巧需要往容器内增加profile的值作为缓存,此时新增有两个问题:
    1. 假如哈希冲突相同,进入重入锁,执行putVal的插入逻辑,然后当前桶是负的哈希值(RESERVED -3),死循环(因为putVal就没有处理预留节点的逻辑)
    2. 假如哈希值不冲突,插入到其他桶节点,不会阻塞操作成功,进入addCount计数器+1,判断需要扩容,则进行扩容,分配线程bound一系列操作后,发现处理预留节点时(RESERVED -3),死循环(恰好transfer也没有处理预留节点的逻辑)

综上,咱们也了解了为什么这里会死循环,实际上,这里是用一个例子,模拟了两个computeIfAbsent在计算节点时会产生死循环的BUG,当然已经有大佬帮忙提了 JDK-8062841

image.png

这个bug已经在JDK9版本修复了,我们可以看下相关修改

image.png

在插入时(扩容时也有类似逻辑)会判断当前桶是否是预留节点,如果是则会抛出异常,不允许修改。


最佳实践


说了那么多,我们来想想,要怎么做才是最佳的?

  1. 导致死循环的根源是computeIfAbsent方法,因为在计算时设置的预留节点ReservationNode不是基本节点而是一个临时节点,无法进行同时新增或者扩容迁移,那么我们应该避免在computeIfAbsent的mappingFunction新增元素,即避免套娃使用
  2. 读懂ConcurrentHashMap的流程后,我们应该构造更好的哈希键,性能瓶颈在于线程竞争锁资源,减少锁的竞争,方能达到最佳性能。
  3. Cas是一种乐观锁,并发低的情况下,使用它可以降低性能消耗,但是并发很高会不断尝试,白白消耗资源也抢占不到资源,此时重试使用synchronized也是一种明智的方案

这里可以看出从JDK7到JDK8的优化是很精妙的,之前的Segment技术在扩容时其他线程对本Segment写是挂起等待的,从等待到协助扩容,让我们知道多线程竞争问题不是程序的瓶颈,要运用多线程首先学会去拥抱多线程,分段扩容技术是很优秀的思想,一个方法能够让多个线程同时协作完成的代码是很值得拜读的。


感谢你能看到这里,至此,这个BUG已经搞清楚了,顺便学习了ConcurrentHashMap的执行整体流程,也是收获颇丰的,相关知识点我已经写在小节里了。如果你有疑问,可以在下方留言,我看到便会回复。接下来,我会在下篇中整理多线程计数、查询、移除等方法完善ConcurrentHashMap的学习历程。

本文代码地址 github

参考

JDK 8 源码
https://my.oschina.net/qiketime/blog/4694727
https://segmentfault.com/a/1190000021237438
https://juejin.cn/post/6844904191077384200#heading-2
https://www.zhenchao.org/2016/08/16/java/segment-based-concurrent-hashmap/

  • 本文作者: 沧澜
  • 本文链接: https://www.meetkiki.com/archives/从死循环BUG来聊聊ConcurrentHashMap的执行内幕(上)
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 软件思想 # SpringBoot # 领域驱动设计 # 算法 # 中间件 # 计算机网络 # MySQL # 数据库 # javascript # 极客时间 # 分布式架构 # Jenkins # JVM # 多线程 # Java基础 # CentOS安装 # 编译OpenJDK # 持续集成 # 杂谈
深入理解计算机网络IO(中)——IO多路复用
聊聊ConcurrentHashMap的执行内幕(下)
  • 文章目录
  • 站点概览
沧澜

沧澜

芝兰生于幽谷,不以无人而不芳
君子修身养德,不以穷困而改志

75 日志
19 分类
19 标签
RSS
Creative Commons
0%
© 2019 — 2022 蜀ICP备19039166号
由 Halo 强力驱动
|
主题 - NexT.Mist v5.1.4