简介
JDK1.7版本的ConcurrentHashMap
是根据分段锁实现的,分段的意思体现在初始化时的Segment数组上,而且是不支持扩容的,所以并发性能还是会有一定的限制。而JDK1.8版本的ConcurrentHashMap
采用了和HashMap类似的实现机制:数组+链表+红黑树,通过自旋+synchronized
锁+CAS+volatile
的方式来实现保证数据的一致性。
结构分析
重要的内部类
Node
Node类实现了Map.Entry接口,主要存放key/value键值对,链表结构具有next域。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// volatile 修饰,保证可见性
volatile V val;
volatile Node<K,V> next;
...
}
TreeNode
树化后红黑树存储节点,继承于承载数据的Node类。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
...
}
ForwardingNode
代表特殊节点,hash值为-1(MOVED),在扩容迁移数据时插入到原数组中,表示正在迁移数据。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
// MOVED: hash值,-1。
super(MOVED, null, null, null);
this.nextTable = tab;
}
...
}
TreeBin
TreeBin不是红黑树的存储节点,TreeBin通过root属性维护红黑树的根结点,first属性指向TreeNode链表的头结点。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter; // 当前使用这棵红黑树的线程
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
}
重要属性
// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量by:https://jingling.im
private static final int DEFAULT_CAPACITY = 16;
// 可能的最大(非 2 的幂)数组大小。 toArray 和相关方法需要。
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 默认加载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转换成树的阈值,该值必须大于 2,并且应至少为 8
static final int TREEIFY_THRESHOLD = 8;
// 树退化成链表时的计数值
static final int UNTREEIFY_THRESHOLD = 6;
// 链表树化时,最小的链表表容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 最小转移容量
private static final int MIN_TRANSFER_STRIDE = 16;
// sizeCtl 中用于生成标记的位数
private static int RESIZE_STAMP_BITS = 16;
// sizeCtl 中用于生成标记的位数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// sizeCtl 中记录大小标记的位移位。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; //表示正在转移
static final int TREEBIN = -2; //表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 装载Node的数组,第一次插入数据时延迟初始化
transient **volatile** Node<K,V>[] table;
// 扩容时使用,平时为null,只有在扩容的时候才为非null
private transient **volatile** Node<K,V>[] nextTable;
// 基本计数器值,主要在没有争用时使用(通过CAS的方式修改)
private transient **volatile** long baseCount;
// 表初始化和调整大小控制;
// 如果为负,则表正在初始化或调整大小。
// 当 table 为 null 时,保存创建时使用的初始表大小,或默认为 0。
// 初始化后,保存下一个要调整表格大小的元素计数值。
private transient **volatile** int sizeCtl;
// 调整大小时要拆分的下一个表索引
private transient **volatile** int transferIndex;
方法分析
构造方法
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel; // 使用默认的并发级别
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
方法小结:
tableSizeFor
方法最后会返回一个2的幂的数作为最后的cap。- 构造方法没有初始化数组,具体是在put第一个元素的时候初始化。
- sizeCtl 的计算结果要保证是2的幂(计算方式是1.5*initialCapacity+1的结果向上取第一个2的幂,如果入参initialCapacity为10,sizeCtl就是16,如果入参initialCapacity为17,最后sizeCtl就是32),初始化数组之前表示的是容量的意思。
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 参数校验,不允许key/value为null
if (key == null || value == null) throw new NullPointerException();
// 1. 计算hash值
int hash = spread(key.hashCode());
int binCount = 0; // 用于记录冲突链表的大小
// 思考,这里为什么要自旋?
// 第一次put数据时要先初始化数组,
for (Node<K,V>[] tab = table;;) { // tab 为数组引用
Node<K,V> f; // 索引位的节点
int n, i, fh; // n:数组的长度;i:索引(计算方法 (n - 1) & hash);fh: f节点的hash
if (tab == null || (n = tab.length) == 0) // 数组还没初始化
tab = initTable(); // 第一次插入数据时延迟初始化,自旋继续
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 当put进来,计算的目标索引位置没有元素时,直接插入
// 通过cas的方式修改,直接将put的值放入到指定数组的位置
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
// 完成put,退出整个for循环,退出方法
break; // no lock when adding to empty bin
} else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 如果正在扩容,帮助数据迁移,并返回扩容后的新数组
else { // 到这里,说明是hash冲突了,数组tab所在的索引位置已经有数据了;下面的方法实现就需要把新put的值加入到链表或者树中
V oldVal = null;
synchronized (f) { // 同步锁f
if (tabAt(tab, i) == f) { // 再次检查是否是同一个节点,有没有被修改
if (fh >= 0) {
binCount = 1;
// e f 是冲突链表的头结点,这里遍历
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 目标hash,key都一致,满足条件就替换旧值by:https://jingling.im
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 遍历到了尾节点了
// new 一个node节点,插入到链表的尾巴上
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 冲突的位置已经不是链表了,是树
Node<K,V> p; // 接收插入树中成功的目标节点
binCount = 2; //树,直接设置成2
// putTreeVal:查找或添加一个节点兵返回
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value; // 替换旧值
}
}
}
}
if (binCount != 0) {
// 上面put的时候如果已经是树了,binCount会被直接设置成2
if (binCount >= TREEIFY_THRESHOLD) // 至少是8,才去尝试转换成为红黑树
treeifyBin(tab, i); // 里面还有条件检查,不一定马上转成树
if (oldVal != null)
return oldVal;
break; // 退出整个for自旋
}
}
}
// 元素数量加1,并判断是否达到扩容门槛
addCount(1L, binCount);
return null;
}
put方法小结:
- ConcurrentHashMap 不允许key/value 为null。
- 在第一次插入数据时才初始化数组
table
。 - put时如果正在扩容(要插入位置的节点hash值为-1),当前线程会加入到帮组扩容迁移数据(
helpTransfer
),并返回扩容后的新数组,然后继续执行put逻辑。 - 如果put时,目标位置为空时,直接插入,不要锁。
- 如果put时,目标位置不为空时,使用
synchronized
锁住一个桶,然后决定是替换旧值或是加入到链表或树中。替换旧值不会触发扩容,直接返回旧值。 - 如果冲突的位置链表长度大于等于8时,会尝试链表树化(
treeifyBin
:实际还需要数组长度至少为64才会树化 )。 - 添加计数(
addCount
),检查是否需要扩容,退出。
初始化数组:initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 自旋,判断表是否有初始化
if ((sc = sizeCtl) < 0) // sizeCtl为负数,则表正在被其他线程初始化或调整大小。
Thread.yield(); // 暂停当前正在执行的线程, 让出CPU的执行权执行其他线程,但是如果没有同等优先级的线程或者更高优先级的线程, 该方法将不起作用
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // cas 修改为-1,表示当前线程正在初始化table
try { // by:https://jingling.im
if ((tab = table) == null || tab.length == 0) { // 再次检查table没有被初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt; // 赋值初始化好的table
sc = n - (n >>> 2); // 计算实际上等同于 0.75 * n
}
} finally {
sizeCtl = sc; // 表初始化好后,sizeCtl 可以理解为扩容的阈值
}
break;
}
}
return tab; // 返回初始化好的table
}
initTable方法小结:
- 初始化数组一般是在第一次put数据的时候完成的。
- 使用
volatile
+CAS+自旋+双重检查的方式来控制sizeCtl
,sizeCtl
小于0表示有线程正在初始化数组。 - 未初始化前sizeCtl表示容量,初始化后
sizeCtl
被表示为下次要扩容的阈值(0.75*n)
添加计数:addCount
put一个新的元素后,会进入新增计数方法,入参check
参数小于0时不会做扩容检查。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;// b:基础计数,by:https://jingling.im
// 两个条件:1.counterCells 不为空 2.cas修改baseCount失败 (或的关系,不是与的关系)
// cas修改失败说明存在竞争关系,需要用到counterCells记录
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//s:基础加上x后的计数
CounterCell a; long v; int m;
boolean uncontended = true; //无竞争
// 1. as == null:说明counterCells数组还没有初始化
// 2. (m = as.length - 1) < 0:counterCells数组长度为0
// 3. 当前线程所在的位置段为null
// 4. cas 修改当前线程段的值失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 完成了CounterCell数组初始化等工作,计数加入到CounterCell 数组
fullAddCount(x, uncontended);
return;
}
if (check <= 1) // 链表长度小于等与1,不看扩容,直接退出
return;
s = sumCount(); // 重新计算元素个数
}
if (check >= 0) { // 主要看这里(counterCells为null,cas修改baseCount成功 )
Node<K,V>[] tab, nt; int n, sc;
// 自旋,判断达到扩容门槛
// 第一次自旋的时候sizeCtl表示的还是扩容门槛的意思,s表示的再次put一个元素后的值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { // sc<0 说明正在扩容中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 扩容完成,退出自旋
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt); // 其他线程正在扩容,去帮助扩容,nt是已经扩容后的数组了
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
// 正常第一个线程触发扩容时会进入到这里
// 进入迁移元素
transfer(tab, null);
s = sumCount(); // 重新计算元素个数
}
}
}
addCount方法小结:
- 当没有冲突时,直接cas修改
basecount
完成计数更新。 - 自旋,判断是否达到扩容门槛
sizeCtl
。 - 如果达到扩容门槛,调用
transfer
方法进行扩容和转移数据操作。 - 如果发现已经有其他线程在扩容了,则当前线程加入迁移数据中(
transfer(tab, nt)
)。
扩容转移:transfer
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride; // n:原数组的长度
// 保证stride 最小16,步长,将n按照stride大小切分成N段
// 当原数组n为512,CPU核数为2,stride 会变为32
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try { // nextTab 为 null,先进行一次初始化
@SuppressWarnings("unchecked")
// 新的数组大小是原来的2倍 n<<1
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE; // 防止扩容时内存溢出
return;
}
nextTable = nextTab;
transferIndex = n; // n是原数组的长度大小
}
int nextn = nextTab.length; // 扩容后数组的大小
// ForwardingNode 翻译过来就是正在被迁移的 Node 构造方法会把hash设置成-1(MOVED)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 循环 ,bound是边界值,i会倒序遍历数组
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { // 自旋
int nextIndex, nextBound;
if (--i >= bound || finishing) // 第一次这个if肯定不成立,i每次自减1
advance = false; // 退出自旋(这里不是马上退出,下面advance 还有可能被设置成true)
else if ((nextIndex = transferIndex) <= 0) { // 第一次while自旋是不会进入到分支里面的,进入到这里说明倒序遍历到0了,已经结束了
// transferIndex 在上面被赋值成n,也就是原数组的长度大小
i = -1;
advance = false; // 退出自旋
} else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - **stride** : 0))) {
// 这里cas 修改transferIndex的值成功
// nextBound = (nextIndex > stride ? nextIndex - stride : 0) 是神马意思呢?
// 如果原数组长度为512,这里nextBound就是481
bound = nextBound; // 遍历完一个stride 单位后,继续下一个边界计算
i = nextIndex - 1; // i变为原数组长度-1,也就是原数组最后一个元素
advance = false; // 退出自旋
}
}
// 第一次自旋进入到下面时,i表示的是原数组的倒序索引(也就是最后一个元素的索引)
if (i < 0 || i >= n || i + n >= nextn) { // 第一次不会进第一个if分支,先看if后面的分支逻辑,最后走这个分支
int sc;
if (finishing) {
nextTable = null; // 协助扩容的table置为null
table = nextTab; // 更新扩容后的table
sizeCtl = (n << 1) - (n >>> 1); // 相当于 n*0.75,作为下次扩容判断的阈值
return; // 退出
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 最后面的synchronized 部分全部走完了,会进入到这儿
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true; // 标记finishing 为true ,下面把i重新设置为原数组大小
// i被重新设置成n后,会再次检查原数组是否已经迁移完成;也就是下面的(fh = f.hash) == MOVED;检查完后,再进入到上面finishint = true的逻辑,最后退出
i = n; // recheck before commit by:https://jingling.im
}
} else if ((f = tabAt(tab, i)) == null) // tab是原数组,原数组第i个元素是空的?
// 如果桶中无数据,直接放入ForwardingNode节点标记该位置已经迁移
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // 桶中有数据,但是hash值表示已经迁移过了(节点已经被设置过ForwardingNode)
advance = true; // already processed
else {
// 锁原数组的那个桶 table[i]
synchronized (f) {
// 再次校验节点是否有变化,防止在锁之前已经被其他线程迁移
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // fh是hash值,只要不是负数(MOVED),就表示还没被迁移
// 运行位,和计算索引类似,但是有区别(索引计算是: (n-1) & hash)
int runBit = fh & n;
// lastRun 和jdk1.7的类似
Node<K,V> lastRun = f;
// 遍历链表
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) { // 直到找到最后一个不一样的
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
// 遍历链表,拆分成两个链表(倒序)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0) // 放入到低位链表(倒序)
ln = new Node<K,V>(ph, pk, pv, ln);
else // 放入到高位链表(倒序)
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 放入到新数组的原位置
setTabAt(nextTab, i, ln);
// 放入到新数组的新位置
setTabAt(nextTab, i + n, hn);
// 标记原数组的i位置已迁移
setTabAt(tab, i, fwd);
advance = true; // 继续自旋处理下一个i--
}
else if (f instanceof TreeBin) { //原数组i位置的元素是树节点
// 大致过程也是拆分成两棵树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
// lc hc 是count数
int lc = 0, hc = 0;
// 遍历整棵树
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) { // 根据计算,拆分成两棵不同的树
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 分化后的树,节点数小于等于6时,则将树链表化
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln); // 设置到低位
setTabAt(nextTab, i + n, hn); // 设置到高位
setTabAt(tab, i, fwd); // 标记原数组该位置已迁移
advance = true;
}
}
}
}
}
}
transfer方法小结:
- 每次调用transfer方法会按照
stride
大小的纬度迁移数组数据(stride:
默认16,和数组长度以及CPU数有),按数组索引倒序迁移。 - transfer 方法完成了数组扩容*2的过程,通过
volatile
修饰的nextTable
数组是否能null来验证是否已经有线程扩容了数组。 - 将原数组中的节点迁移到新数组后会将原数组中的位置设置成一个特殊的节点
ForwardingNode
,其hash值为-1,迁移时通过synchronized
的方式来锁住迁移的桶。 - 当迁移时桶中冲突的链表或树会被拆分成两个新的链表(会被倒序)或树,然后分别放入到低位(原数组的索引位i)和高位(新数组的索引位i+n)。
- 第一次迁移完后会做一次对原数组的检查,再次遍历原数组检查数组是否完成迁移(hash为-1)
- 最后会将
sizeCtl
设置为下一次扩容检测阈值。
helpTransfer
当put数据时,发现计算出来的索引位置上的节点hash值为-1时,会进入到该方法。
// 进入该方法,说明是有扩容在发生的
// table是原数组,f是索引位hash为-1的节点
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc; // nextTab:预扩容的数组(要扩容后的新数组)
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// if 就是检查是否在扩容
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 检查是否正在扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
// transferIndex <= 0 说明就扩容完成了
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);// 加入到扩容
break;// 扩容完成,退出自旋by:https://jingling.im
}
}
return nextTab;
}
return table; //进入方法发现已经完成扩容了,直接返回新数组
}
helpTransfer方法小结:
- 当put时,如果原节点的hash值为-1,说明正在扩容,线程会进入到该方法,确定是否要加入到扩容。
- 扩容完成后会返回扩容后的新数组,这样put方法继续执行之后的插入值逻辑。
链表树化:treeifyBin
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; // 数组索引位置的元素
int n, sc; // n:数组的长度,
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 数组的长度小于64
tryPresize(n << 1); // 尝试扩容( << 1 等效于 *2),不执行树化
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 需要同时满足 8 + 64的条件
synchronized (b) { // 锁指定的node节点
if (tabAt(tab, index) == b) { // 获得锁后再次检查索引位置的节点是否有变
TreeNode<K,V> hd = null, tl = null;
// 开始遍历链表,转换成树节点链表(还是一个链表关系)
for (Node<K,V> e = b; e != null; e = e.next) {
// 构造树节点,这里还不是树形的
TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将树链表树化设置到数组中的相应位置,hash值是-2
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
方法小结:
- 在put元素的时候,如果冲突链表超过一定长度(至少为
8
),会触发尝试树化的操作。 - 实际还需要数组的长度至少达到了
64
才会执行树化,如果达不到会尝试扩容。 - 需要同时满足 8 + 64的条件才会树化。by:https://jingling.im
- 树化的时候会锁住数组中整个冲突的节点(链表)。
- 树化会先将原Node链表转换成TreeNode链表,然后再转换成TreeBin节点并构造一颗红黑树设置到原数组的索引位置。
树化前尝试扩容: tryPresize
private final void tryPresize(int size) {
// put时这里的size入参已经是原table的长度*2了
// c: 扩容的值,防止为了超过最大值,保证是2的幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc;
// sizeCtl:在表初始化好后,存储的值就是扩容的阈值
while ((sc = sizeCtl) >= 0) { //sizeCtl要么是扩容阈值,要么是初始化容量
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) { // 这里是防止table还没有初始化,putAll方法会直接调这里,和initTable逻辑一致
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) { // 初始化表
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
} else if (c <= sc || n >= MAXIMUM_CAPACITY) // 没有达到扩容的阈值或者已经达到最大不能再扩容了
break; // 退出; by:https://jingling.im
else if (tab == table) { // 这里再次检查,是否已经被其他线程扩容,表已经发生变化?
// resizeStamp(n): return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
// Integer.numberOfLeadingZeros(n): 会返回从高位(总32位)数一共有多少个0,直到遇到1,比如numberOfLeadingZeros(n) 返回26
// 1 << (RESIZE_STAMP_BITS - 1) : 1 左位移 15 位,相当于2的15次方
// | 相当于把两个结果加起来
int rs = resizeStamp(n); // rs源码没注释,具体什么作用暂时不懂
if (sc < 0) {// 说明有其他线程已经正在扩容
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0) // 满足条件退出,这条件啥意思?
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt); //加入到扩容by:https://jingling.im
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); // 扩容,迁移数据
}
}
}
tryPresize方法小结:
有两种情况会进入到该方法:
- 直接调用
putAll
方法。 putAll时该方法会先初始化表,和initTable逻辑一致。 - 当put元素后,冲突节点的链表长度≥8,<64会进入到该方法。
- 最后调用
transfer
方法开始扩容或者加入到扩容。
链表树化:treeBin
通过TreeBin的构造方法,将TreeNode链表转换成红黑树;
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b; // treeBin的first节点指向TreeNode链表的头结点
TreeNode<K,V> r = null;// 树化后的根节点root
// 开始遍历TreeNode链表
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next; // 遍历当前节点的下一个节点
x.left = x.right = null; // 保证左右节点为null
if (r == null) { // 确定根节点
x.parent = null;
x.red = false;
r = x;
}
else {
// 进入到下面,x是从TreeNode链表的第二个节点开始的
K k = x.key;
int h = x.hash; // 当前节点的key和hash值
Class<?> kc = null;
// 遍历以r为根节点的红黑树
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
// 比较确定方向by:https://jingling.im
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null && (kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
// comparableClassFor 返回key的class类型(没有实现Comparable接口返回null)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
// 根据上面确认下来的dir,判断是要加入到树的左子树还是右子树? 并且为null的子树
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp; // 关联父节点
if (dir <= 0)
xp.left = x; // 加入到左子树
else
xp.right = x; 加入到右子树
r = balanceInsertion(r, x); // 平衡,返回根节点
break;
}
// 子树非null,继续遍历
}
}
}
this.root = r;
assert checkInvariants(root); // 递归检查
}
TreeBin方法小结:
- 初始化TreeBin节点,hash值为-1。by:https://jingling.im
- TreeBin节点的first属性指向TreeNode链表的头结点。
- TreeBin节点的root属性记录整颗红黑树根节点。
- 加入到右子树后会平衡整棵树。
Get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算hash值
int h = spread(key.hashCode());
// (n - 1) & h : 计算hash所在的索引
// 数组初始化了,并且所在索引有值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // 目标hash值一致
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 目标key值一致
return e.val;
}
else if (eh < 0) // 扩容迁移中by:https://jingling.im
// 1.ForwardingNode.find
// 2.TreeBin.find
// 3.TreeNode.find
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 链表,遍历
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法小结:
- 先计算hash值
- 根据hash值计算索引
- 如果索引位置为null,直接返回null
- 如果该位置上的节点hash值、key值都一致,返回找到的节点
- 如果该位置节点hash值为-1,说明正在扩容,调用find方法查找
- 如果该位置节点是链表,则遍历链表查找
其他
红黑树的性质
红黑树是每个节点都带有颜色属性的二叉查找树,颜色为红色或黑色。在二叉查找树强制一般要求以外,对于任何有效的红黑树我们增加了如下的额外要求:
- 节点是红色或黑色。
- 根是黑色。
- 所有叶子都是黑色(叶子是NIL节点)。
- 每个红色节点必须有两个黑色的子节点。(从每个叶子到根的所有路径上不能有两个连续的红色节点。)
- 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。
为什么用红黑树而不用AVL
AVL树的定义:AVL树
- AVL 是 更加严格平衡,任一节点对应的两棵子树的最大高度差为1。
- AVL 树的旋转比红黑树的旋转更难实现和调试。
- AVL 树的查找速度更快,添加/删除速度较慢。
- 密集插入型任务推荐使用红黑树。
总结
- ConcurrentHashMap 1.8 版本使用的是数组+链表+红黑树数的结构。
- ConcurrentHashMap 使用了volatile+CAS+synchronized的方式保证数据的原子性和可见性。