博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ConcurrentHashMap分析(一)整体结构
阅读量:3933 次
发布时间:2019-05-23

本文共 12912 字,大约阅读时间需要 43 分钟。

前言

对于ConcurrentHashMap的分析,网上已经有很多完善的资料,对于它的源码分析对于巩固和提升关于Java并发思想以及Map集合的思考很有帮助。此文是作者自己关于ConcurrentHashMap的分析和总结。由于本人水平有限,分析过程中可能存在纰漏和错误,希望大家可以指出,一起学习,一起进步。

对于此次分析希望能达到以下目的:

  1. 了解ConcurrentHashMap对于并发的优化方法。
  2. 了解并发措施和Map集合的结合。

ConcurrentHashMap结构

下面是ConcurrentHashMap的类继承关系图:1600838485715

对于ConcurrentHashMap继承了AbstractMap我们并不奇怪,因为这是每个Map集合都要继承的。倒是这个ConcurrentMap好像是个新鲜玩意,从名字中我们可以推测它里面包括Map集合的并发操作。而ConcurrentMap注释也说明了,它是一个提高线程安全和原子性保证的接口。

ConcurrentHashMap的代码加上注释有6000+行,如果直接从源码入手可能会猝死。所以我们先从它的整体架构,再深入其中的关键方法。我们先来看看它的内部结构。

1600862074854

节点种类

从上图中我们可以看到ConcurrentHashMap有五种节点,存储在一个table里,它们分别为:NodeTreeBinTreeNodeForwardingNodeReservationNode,对于NodeTreeNode我们可以理解,毕竟一个是链表节点,一个是树节点。可是为什么树的根节点是TreeBin而不是TreeNode,另外ForwardingNodeReservationNode又是什么呢?

上图中树的红黑节点不是随便标注的,其实ConcurrentHashMap的树形结构就是红黑树,如果了解了红黑树的同学肯定会对它左旋、右旋的平衡操作的复杂性记忆深刻。所以这里用TreeBin充当代理节点来进行这些操作,而TreeNode节点仅有查找方法。

这里也许会有人疑惑,为什么不用现成的TreeMap来代替TreeBin

TreeMap在调用put存放数据数据时,会默认对键进行比较排序,如果给TreeMap传入了Comparator或者键本身实现了Comparable接口,就会按照自定义规则进行排序。而ConcurrentHashMap并不需要这个功能,而且TreeMap在put方法中,会强转键为Comparable类型:1600841664308

那么问题就来了,以下代码就会出现错误:1600841768708

调用Double.compareTo时,会对入参Integer进行转型,但是Double和Integer不属于父子类关系,所以会报错类型转换异常。

关于ForwardingNode 节点,它的注释写道:

A node inserted at head of bins during transfer operations.

在转换操作时插入到书头部的一个节点,应该是和ConcurrentHashMap的扩容缩容有关系,等到后面讲到时再具体分析

对于 ReservationNode类型的节点根据它的注释判断出它是起到一个类似占位的作用,好像现在没有什么和它有关系,暂时就先放着。

/** * A place-holder node used in computeIfAbsent and compute. * 用于 computeIfAbsent 和 compute 的占位节点 */

我们还要先了解ConcurrentHashMao的常量和变量字段,便于在后面分析代码时更好地理解。

常量

/** * 容量的最大值 */private static final int MAXIMUM_CAPACITY = 1 << 30;/** * 容量的默认值 */private static final int DEFAULT_CAPACITY = 16;/** * 数组的最大容量,这个在ArrayList里也有,大部分人赞同的答案是: * 减少一些机器发生内存溢出的可能性【https://www.zhihu.com/question/27999759】 */static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/** * 这个哈希表默认的并发级别,用于兼容以前的版本 */private static final int DEFAULT_CONCURRENCY_LEVEL = 16;/** * 负载因子 */private static final float LOAD_FACTOR = 0.75f;/** * 链表转换为树的阈值 */static final int TREEIFY_THRESHOLD = 8;/** * 树转换为链表的阈值 */static final int UNTREEIFY_THRESHOLD = 6;/** * 在链表转换为树之前,当前槽节点的数量必须大于这个值 */static final int MIN_TREEIFY_CAPACITY = 64;/** * 在树转换为链表之前,当前槽节点的数量必须不大于这个值 */private static final int MIN_TRANSFER_STRIDE = 16;/** * 用于在扩容时生成随机数的种子 */private static final int RESIZE_STAMP_BITS = 16;/** * 并行进行扩容的最大线程数 */private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;/** * 记录sizeCtl大小所需要进行的偏移位数【现在没看懂什么意思】 */private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;static final int MOVED     = -1; // 标识 ForwardingNode 节点static final int TREEBIN   = -2; // 标识红黑树根节点static final int RESERVED  = -3; // 标识 ReservationNode 节点static final int HASH_BITS = 0x7fffffff; // 标识普通节点/** CPU的核心数量,用于扩容时使用 */static final int NCPU = Runtime.getRuntime().availableProcessors();

变量

/** * Node 数组 */transient volatile Node
[] table;/** * 扩容过程中使用的 Node 数组,采用渐进式数据迁移的方式 */private transient volatile Node
[] nextTable;/** * 基础计数器,主要在无线程竞争的条件下使用,在 table 初始化时也被用来做 fallback 操作,通过 CAS 进行更新 */private transient volatile long baseCount;/** * table 初始化和扩容的状态标识: * >0:数组初始化后的容量 * 0:默认初始值 * -1:单线程扩容 * -1(1 + nThread):多线程扩容 */private transient volatile int sizeCtl;/** * 扩容时另一个表的下标 */private transient volatile int transferIndex;/** * 在扩容或创建 CounterCells 时的自旋锁. */private transient volatile int cellsBusy;/** * CounterCell 数组,用于热点数据分段计算,跟 LongAddr 的 Cell 差不多 */private transient volatile CounterCell[] counterCells;

操作分析

put

put系列方法都是调用了putVal方法,我们来看下它的源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException(); // 根据 key 计算出 hash 值 int hash = spread(key.hashCode()); int binCount = 0; for (Node
[] tab = table;;) {
Node
f; int n, i, fh; K fk; V fv; // 如果数组为空就先初始化数组 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 下标的计算方式:(table.length - 1) & key.hashCode // 如果计算出的下标的槽的元素为空,直接放入然后返回 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node
(hash, key, value))) break; // no lock when adding to empty bin } // 判断槽的状态是否为 MOVED,即该节点是一个 Fowarding 节点, // 正在进行扩容操作,尝试协助数据迁移。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 无锁状态下节点首节点的 hash 是否和要添加的元素的 hash 一样, // 一样就直接设置值就可以了 else if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else {
V oldVal = null; synchronized (f) {
// 判断一下 f 节点是不是首节点 if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1; // 遍历链表,如果链表中存在和 key 的 hash 值相同的元素,替换该节点的 value 值 for (Node
e = f;; ++binCount) {
K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node
pred = e; if ((e = e.next) == null) {
pred.next = new Node
(hash, key, value); break; } } } // 开始遍历树节点了 else if (f instanceof TreeBin) { Node
p; binCount = 2; // 将 key-value 值放入红黑树中 if ((p = ((TreeBin
)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } // 前面写了,ReservationNode 节点是一个占位的辅助节点而已,所以这里直接抛出异常 else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } // 判断要不要链表转红黑树 if (binCount != 0) { // TREEIFY_THRESHOLD = 8 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null;}

虽然putVal的方法很长,但是实际上只是对各种情况的判断而已。我们先来看看treeifyBin方法。

private final void treeifyBin(Node
[] tab, int index) {
Node
b; int n; if (tab != null) {
// MIN_TREEIFY_CAPACITY=64 // 当 table 容量小于64时,只是对 table 进行扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode
hd = null, tl = null; // 链表转换成红黑树 for (Node
e = b; e != null; e = e.next) {
TreeNode
p = new TreeNode
(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 包装成 TreeBin 类型,并放入 table[index] 中 setTabAt(tab, index, new TreeBin
(hd)); } } } }}

对于其中的initTablehelpTransferputTreeVal方法留到后面再说。

get

我们来看看get方法的源码:

public V get(Object key) {
Node
[] tab; Node
e, p; int n, eh; K ek; // 计算 hash 值 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
// 首节点对应上了就直接返回 if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 从红黑树/ForwardingNode/ReservationNode开始找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 从链表开始找 while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}

对于链表是从头开始查找这是没什么可说的,但是红黑树的插入删除会涉及整个结构的调整,我们来看下红黑树的find的代码:

final Node
find(int h, Object k) {
if (k != null) {
for (Node
e = first; e != null; ) {
int s; K ek; // 用链表的方式查找元素,有两种情况: // 1. 有线程正持有写锁 // 2. 有线程等待获取写锁 if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } // 读线程+1,同时更新读状态 else if (U.compareAndSetInt(this, LOCKSTATE, s, s + READER)) {
TreeNode
r, p; try {
p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally {
Thread w; // 如果当前线程是最后一个读线程,且有写线程因为读锁而阻塞,则告诉写线程可以尝试获取写锁了 if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) LockSupport.unpark(w); } return p; } } } return null; }

ConcurrentHashMap运用了类似读写锁的方式,当有线程修改红黑树时,读线程采用链表的方式进行查找。

ForwardingNode 同样重写了 Node 的 find 方法,我们来看下它是怎么实现的:

Node
find(int h, Object k) {
outer: for (Node
[] tab = nextTable;;) {
Node
e; int n; // 键为空,table 为空,键对应的槽位空都是直接返回 if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) {
int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) {
// 如果 e 节点还是 ForwardingNode 节点,那么就去新的用于迁移数据的 table 找 if (e instanceof ForwardingNode) {
tab = ((ForwardingNode
)e).nextTable; // 从最外部循环再次开始 continue outer; } else // 这个方法就是调用 TreeBin 节点的 find 方法 return e.find(h, k); } // 查到最后了还没有找到那就直接返回 null if ((e = e.next) == null) return null; } } }}

ReservationNode 节点是占位节点,本身没有数据,所以就直接返回 null 了:

Node
find(int h, Object k) {
return null;}

size

分析了getput方法后,前面部分的常量和变量已经用到了,但是还有个CounterCell数组没有看到在哪里使用过。前面说它类似于LongAddrCell,那么我们就来看看对于热点数据 ConcurrentHashMap 是怎么处理的,其中的主要应用从 size 方法开始:

public int size() {
long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);}

这里没什么好说的,就是获取元素总数,然后处理一下整型越界的情况再返回。我们把关注点放在sumCount方法:

final long sumCount() {
CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) {
for (CounterCell c : cs) if (c != null) sum += c.value; } return sum;}

我们来对比一下 LongAddersum 方法:

public long sum() {
Cell[] cs = cells; long sum = base; if (cs != null) {
for (Cell c : cs) if (c != null) sum += c.value; } return sum;}

是不是感觉就是copy了一份?很显然它们都是使用了分段计数的方法,我们来看看 CounterCell 这个槽对象在并发冲突时的处理方法。还记得我们在分析get方法时最后调用了一个addCount(1L, binCount);吗,它的处理逻辑可以在addCount 里看到:【后半部分涉及到扩容,这是个很大的知识点,所以需要另开一文,因此现在只对前面分析】

CounterCell[] cs; long b, s;if ((cs = counterCells) != null ||    // 如果 CAS 更新失败,说明出现并发竞争,那么就将计数器累加到 CounterCell 数组    !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m; boolean uncontended = true; if (cs == null || (m = cs.length - 1) < 0 || // 根据线程的 random 计算槽的索引 (c = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
// 如果还是更新失败则执行该方法 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount();}

addCount前半部分的处理逻辑为:如果counterCells为空,说明之前没有出现并发冲突,那么就可以直接将值加到baseCount上。否则就尝试更新counterCells[i]中的值,如果还是更新失败,那么说明槽也有并发冲突,那么就需要对槽进行扩容,调用方法fullAddCount,该方法的逻辑跟LongAdderlongAccumulate差不多一样,扩容成功后再次更新值。

总结

以上就是这篇文章的分析,主要通过 ConcurrentHashMap 的整体结构入手,先是分析了各个节点,以及各常量和变量的作用,然后通过了解主要的三个方法getputsize对其主要的处理逻辑有了一定的了解。后面会以 ConcurrentHashMap 的扩容入手,通过方法一步步感受 ConcurrentHashMap 那优美的并发处理逻辑。

转载地址:http://jjqgn.baihongyu.com/

你可能感兴趣的文章
js 表单验证
查看>>
怎么找到适合自己的工作
查看>>
CGI脚本
查看>>
nginx的fix_pathinfo漏洞
查看>>
php-cgi占用cpu资源过高的解决方法
查看>>
php-fpm.conf 相关参数
查看>>
nginx 内部结构分析
查看>>
utuntu常用配置
查看>>
GIT简介
查看>>
GIT客户端
查看>>
GIT系统安装
查看>>
GIT命令行应用
查看>>
php编程技巧
查看>>
款免费的PHP加速器:APC、eAccelerator、XCache比较
查看>>
Nginx 压力测试 /webbench
查看>>
ubuntu访问windows共享文件夹
查看>>
ubuntu用户和用户组管理
查看>>
ubuntu网络配置
查看>>
linux 下 apache php-cgi 安装及配置
查看>>
git 传输
查看>>