集合类源码浅析のJDK1.8ConcurrentHashMap(上篇)
文章目录
- 前言
- 一、概述
- 二、CHM的属性
- 1、属性
- 三、新增方法
- 1、put
- 2、initTable
- 四、分段计数
- 1、addCount
- 2、fullAddCount
- 3、sumCount
- 总结
前言
本篇是JDK1.8的ConcurrentHashMap源码个人学习笔记,ConcurrentHashMap(笔记中简称CHM)是一种线程安全的HashMap,1.8中废弃了segment数组,而是对单个桶进行线程安全控制,并且引入了分段计数,多线程扩容的思想,限于篇幅,上篇重点记录增加元素以及分段计数的源码实现
。
一、概述
HashMap是线程不安全的,尤其是在1.7中的插入元素使用的头插法,会导致多线程下扩容死链的问题,使CPU直接100%。如果要保证线程安全,可以使用HashTable和CHM。相比于HashTable简单粗暴地在每个方法上加悲观锁,CHM的效率会更高一些,一方面在于,CHM是悲观锁与CAS操作相结合,并且锁的粒度更细,精确到每个桶下标。因为假设A,B两个线程同时要将自己的元素插入CHM,元素的位置又不在同一个桶下标,如果锁整个Table,无疑是一种性能上的浪费。
而计算Table中所有的桶中的元素,使用的是分段计数
的思想,即每个线程创建自己的累加数组
,累加数组是独立于Table的,也可以进行扩容,最终再进行合计(在JUC的LongAddr中使用的也是这样的思想)。扩容也是多线程参与的,每个线程参与一定步长
的桶的元素迁移。
二、CHM的属性
1、属性
因为CHM的属性和HashMap的一些是相同的,所以只介绍CHM特有的重要属性
//当某个节点被从一个桶迁移到另一个桶时,原位置的节点会被标记为 MOVED,以防止重复操作或访问已经“转发”的节点。
static final int MOVED = -1; // hash for forwarding nodes
//用于标识一个桶中的链表已经被转换成树形结构。
static final int TREEBIN = -2; // hash for roots of trees
//用于表示一个桶位置正在被暂时保留。它通常出现在扩容或重哈希过程中,表示某个位置正在等待数据填充或进一步处理。
static final int RESERVED = -3; // hash for transient reservations
//保留 int 类型哈希值的前 31 位,因为 int 类型的哈希值为 32 位,最高位通常用于标识负数(符号位),而 HASH_BITS 会屏蔽掉符号位,只保留 31 位有效的哈希值部分。
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
桶的容量,只有一个线程可以对其进行修改。
private transient volatile long baseCount;
用于指示哈希表的当前状态:
- 在创建CHM时,用来表示初始的哈希表大小(或容量)。
- sizeCtl 还用于控制何时触发扩容。
- sizeCtl 在负数时表示哈希表正在扩容中。
- sizeCtl 在正数值表示哈希表的初始容量或扩容的阈值。
private transient volatile int sizeCtl;
transferIndex 由一个线程初始化,并在扩容过程中逐渐递增。每个线程通过读取和更新 transferIndex,知道自己需要迁移哪些桶。
假设需要扩容,原来有 16 个桶,而扩容后会有 32 个桶。在迁移过程中,transferIndex 会从 16 开始,每个线程负责将数据从桶 16 到桶 31 迁移到新数组中的相应位置。迁移完成后,transferIndex 更新为 32,表示所有桶已完成迁移。
private transient volatile int transferIndex;
在扩容的过程中,cellsBusy 会作为一种标识,表示是否有线程正在修改当前的桶区域。值为 1 时,表示当前有线程正在执行某个桶的扩容任务;为 0 时,表示当前没有线程在处理这些任务。
private transient volatile int cellsBusy;
累加单元组成的数组
private transient volatile CounterCell[] counterCells;
三、新增方法
1、put
public V put(K key, V value) {
return putVal(key, value, false);
}
putVal是新增的主要逻辑,和HashMap的思路大致相同,也是分为了table为空,插入位置为链表的头节点,链表插入和红黑树插入的逻辑。区别在于:
- 如果发现数组正在扩容,该线程会帮助数组进行元素的迁移。
- 要插入位置的桶中头元素为空时,是通过CAS的操作保证插入元素的线程安全。
- 其他情况都是通过悲观锁锁定单个下标以保证线程安全。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//和HashMap不同的是,CHM的键值强制不能为空,否则抛出异常
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
//binCount 记录桶中的元素数量
int binCount = 0;
//先将属性table赋值给tab 数组,然后无限循环直到满足条件退出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//CHM的table为空,说明是第一次插入元素,就走初始化桶数组的逻辑
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//通过hash计算,将要插入位置的桶中头元素为空,说明该桶还没有元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//通过CAS的方式将k,v包装成node对象放入桶的头部位置
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
//结束
break; // no lock when adding to empty bin
}
//通过hash计算,将要插入位置的桶的hash值为-1,证明该数组正在扩容,就走帮助扩容的逻辑
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//某个桶下标已经存在元素
V oldVal = null;
//加悲观锁
synchronized (f) {
//f是某个桶的头节点。
if (tabAt(tab, i) == f) {
//该数组不是正在扩容
if (fh >= 0) {
//初始化该桶下标的元素为1
binCount = 1;
//遍历链表,循环一次binCount +1 记录该桶下标中元素的数量
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是TreeBin的类型就走红黑树的逻辑
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//某个桶下标中元素的数量不为0
if (binCount != 0) {
//超过阈值就走转换为红黑树的逻辑
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//有key相同的就返回value
if (oldVal != null)
return oldVal;
break;
}
}
}
//多线程并发对table进行操作时,统计各自桶下标中元素的数量
addCount(1L, binCount);
return null;
}
并且CHM中的红黑树,被包装成了TreeBin
,是对HashMap的TreeNode
的二次包装:
具体为什么要这样设计,是因为对红黑树进行元素新增的过程是上锁的,并且锁的是桶下标的头节点,红黑树在插入的过程中,是有可能进行旋转的,也就是root节点会发生改变,例如A线程,进入了插入方法,获取到的锁是20的节点,插入了一个元素,导致了红黑树进行旋转(20不再是根节点),另一个B线程,也进入了插入方法,拿到的锁就不是同一个对象了。
使用了TreeBin后,是对TreeBin整个对象加锁,并不用关心内部的红黑树有没有进行翻转,根节点变不变化,多个线程拿到的都是同一个锁对象。
2、initTable
再来看下initTable()
方法:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//进入后会再次判断表是否为空,双重检查
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//线程A尝试将SIZECTL属性从0改成-1,成功了另一个线程在循环时会Thread.yield(); 让出cpu执行权
//保证表只被初始化一次
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//多次判断表是否为空
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
四、分段计数
1、addCount
private final void addCount(long x, int check) {
//as 累加数组
CounterCell[] as; long b, s;
//进入这个分支的条件:
//1、累加数组不为空(首先会将counterCells属性赋值给as)
if ((as = counterCells) != null ||
//或2、尝试CAS改变BASECOUNT失败,证明存在并发访问桶数组的情况,BASECOUNT只能被一个线程修改。
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//a 累加单元
CounterCell a; long v; int m;
boolean uncontended = true;
//进入这个分支的条件:
//累加数组为空 或数组长度<0 或线程唯一的随机数和数组长度与运算得到的累加数组下标对应的累加单元为空
//或CAS尝试将线程唯一的随机数和数组长度与运算得到的累加数组下标对应的累加单元的 value属性+1失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//统计数组中所有有元素桶中元素的总计
s = sumCount();
}
//....下半段是扩容的逻辑
}
CounterCell
是CHM的一个静态内部类,只有一个value属性:
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
ThreadLocalRandom.getProbe
是偏底层的方法,用于获取当前线程专属的随机数。
2、fullAddCount
核心思想在于多重检查+CAS操作:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
//as 累加数组 a 累加单元
CounterCell[] as; CounterCell a; int n; long v;
//将counterCells属性赋值给as as的长度赋值给n 然后进行判断(无论if是否符合条件,都会进行赋值操作,源码里很多地方只是把赋值和比较放在了一个语句中)
//1、累加数组不为空就会进入该分支
if ((as = counterCells) != null && (n = as.length) > 0) {
//1.1、累加数组长度-1 和 线程的唯一标识 做与运算 得到的累加数组索引位置的元素为空
if ((a = as[(n - 1) & h]) == null) {
//判断是否有其他线程在同时操作
if (cellsBusy == 0) {
//创建一个累加单元,它的value属性为1 // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
//双重检查是否有其他线程在同时操作,并且通过CAS尝试标记当前线程正在进行操作
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//再次检查累加数组是否不为空,并且即将插入的位置为空
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//将创建的累加单元放入累加数组
rs[j] = r;
created = true;
}
} finally {
//释放cellsBusy 标记
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//1.2、wasUncontended为false(某个线程第一次进该方法的wasUncontended默认就是false)
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//1.3、尝试CAS修改某个累加单元中value的值 + 1
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//1.4、累加数组发生了改变,或者长度大于cpu的最大核心数
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//1.5、 collide为false时(collide默认就是false的)
else if (!collide)
collide = true;
//1.6、 cellsBusy 为0 并且 CAS将cellsBusy 设置为1 成功
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//累加数组为空的情况
if (counterCells == as) {// Expand table unless stale
//对累加数组扩容,长度为原有的2倍
CounterCell[] rs = new CounterCell[n << 1];
//元素迁移
for (int i = 0; i < n; ++i)
rs[i] = as[i];
//重新将新累加数组赋值给counterCells 属性
counterCells = rs;
}
} finally {
//释放cellsBusy 标记
cellsBusy = 0;
}
collide = false;
//继续循环
continue; // Retry with expanded table
}
//重新计算进入该方法线程的唯一标识,避免每次都对同一个下标进行操作
h = ThreadLocalRandom.advanceProbe(h);
}
//2、累加数组为空,但是CAS设置CELLSBUSY标记为1成功(保证累加数组属性只会被一个线程初始化)
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
//再次判断 // Initialize table
if (counterCells == as) {
//就会初始化一个累加数组 长度为2
CounterCell[] rs = new CounterCell[2];
//并且给其中某个索引创建累加单元 value为1
rs[h & 1] = new CounterCell(x);
//将该累加数组赋值给属性counterCells
counterCells = rs;
//标记已经初始化
init = true;
}
} finally {
//最终将CELLSBUSY标记设置为0
cellsBusy = 0;
}
if (init)
break;
}
//3、累加数组为空,并且CELLSBUSY标记已经为1
//说明累加数组也无法操作了,就再次尝试修改BASECOUNT的值。
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
如果新进来一个线程,累加数组为空,并且CAS设置CELLSBUSY标记为1成功,自然会创建一个新的累加数组,并且创建一个累加单元放入其中。
假设累加数组中某个累加单元已经有了元素,另一个线程也要尝试操作同样的位置,第一次循环
会将wasUncontended 标记设置为true,然后h = ThreadLocalRandom.advanceProbe(h);
重新计算当前线程的唯一标识,目的就是为了避免第二次循环
重复竞争相同的位置。
假设第二次循环
计算出位置的累加单元也有了元素,则会尝试将该累加单元的value
属性+1。如果+1操作依旧失败,就会将collide 设置为true,然后h = ThreadLocalRandom.advanceProbe(h);
再次重新计算当前线程的唯一标识。
如果多次通过h = ThreadLocalRandom.advanceProbe(h);
重新计算当前线程的唯一标识,依旧无法创建新的累加单元,或者对某个累加单元的value
属性+1,说明多线程竞争激烈,就会尝试扩容。
个人理解wasUncontended 标记和collide 标记是起到一个缓冲
的作用,目的是多次h = ThreadLocalRandom.advanceProbe(h);
重新计算当前线程的唯一标识。
3、sumCount
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
//只有一个线程能操作CHM的baseCount属性
long sum = baseCount;
//累加数组不为空
if (as != null) {
//循环累加数组
for (int i = 0; i < as.length; ++i) {
//累加数组的某个累加单元也不为空
if ((a = as[i]) != null)
//进行累加
sum += a.value;
}
}
//最终的总数 = baseCount + 累加数组中所有不为空的累加单元中value的总计
//sum的作用是判断是否需要扩容
return sum;
}
总结
在CHM中,锁的粒度是桶数组的单个下标。并且在插入元素,累加计数,初始化table中,多次利用了CAS+双重检查的模式,保证线程安全。
下一篇:集合类源码浅析のJDK1.8ConcurrentHashMap(下篇)