0%

JDK7中concorrentHashMap源码解析

JDK7中concorrentHashMap源码解析

1、数据结构

Java 7 中 ConcurrentHashMap 的存储结构如上图,ConcurrnetHashMap 由很多个 Segment 组合,而每一个 Segment 是一个类似于 HashMap 的结构,所以每一个 HashMap 的内部可以进行扩容。但是 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数是 16 个,你也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。

2、构造方法

核心参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
   /**
* The default initial capacity for this table,
* used when not otherwise specified in a constructor.
*/
// 默认的数组大小16(HashMap里的那个数组)
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
* The default load factor for this table, used when not
* otherwise specified in a constructor.
*/
// 扩容因子0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/
// 默认并发标准16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* The minimum capacity for per-segment tables. Must be a power
* of two, at least two to avoid immediate resizing on next use
* after lazy construction.
*/
/* 每段表的最小容量。必须为2的幂,至少为2的幂,以免在延迟构造后立即调整下次使用时的大小。*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
* The segments, each of which is a specialized hash table.
*/
// ConcurrentHashMap中的数组
final Segment<K,V>[] segments;

/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
/* 段是哈希表的专用版本。该子类是ReentrantLock的机会子类,只是为了简化一些锁定并避免单独构造。 */
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
/* 每个细分表。元素通过提供可变语义的entryAtsetEntryAt访问。 */
transient volatile HashEntry<K,V>[] table;

/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;

/**
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;

/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;

/**
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

}

/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
/* ConcurrentHashMap列表条目。请注意,它永远不会作为用户可见的Map.Entry导出。 */
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

总结一下在 Java 7 中 ConcurrnetHashMap 的初始化逻辑。

  1. 必要参数校验。
  2. 校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值。无惨构造**默认值是 16。
  3. 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值,作为初始化容量大小,默认是 16
  4. 记录 segmentShift 偏移量,这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到。默认是 32 - sshift = 28
  5. 记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15。
  6. 初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 20.75=1.5,插入第二个值时才会进行扩容。

ConcurrentHashMap()

1
2
3
4
5
6
7
8
9
10
11
   /**
* Creates a new, empty map with a default initial capacity (16),
* load factor (0.75) and concurrencyLevel (16).
*/
/**
* 使用默认的初始容量(16),
* 负载因子(0.75)和concurrencyLevel(16)创建一个新的空映射。
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

ConcurrentHashMap(int initialCapacity)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   /**
* Creates a new, empty map with the specified initial capacity,
* and with default load factor (0.75) and concurrencyLevel (16).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative.
*/
/**
* 创建一个具有指定初始容量,默认负载因子(0.75)和concurrencyLevel(16)的新的空映射。
*
* @param initialCapacity初始容量。该实现执行内部大小调整以容纳许多元素。
*
* @throws 如果元素的初始容量为负,则IllegalArgumentException。
*/
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

ConcurrentHashMap(int initialCapacity, float loadFactor)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  /**
* Creates a new, empty map with the specified initial capacity
* and load factor and with the default concurrencyLevel (16).
*
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is nonpositive
*
* @since 1.6
*/
/* 使用指定的初始容量和负载因子以及默认的concurrencyLevel(16)创建一个新的空映射。 @param initialCapacity该实现执行内部大小调整以容纳许多元素。 @param loadFactor负载系数阈值,用于控制调整大小。当每个仓的平均元素数超过此阈值时,可以执行大小调整。 @throws IllegalArgumentException如果元素的初始容量为负或负载系数为非正值(从1.6开始)*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Creates a new, empty map with the specified initial
* capacity, load factor and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation performs internal sizing
* to try to accommodate this many threads.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive.
*/
/* 使用指定的初始容量,负载因子和并发级别创建一个新的空映射。 @param initialCapacity初始容量。该实现执行内部大小调整以容纳许多元素。 @param loadFactor负载系数阈值,用于控制调整大小。当每个仓的平均元素数超过此阈值时,可以执行大小调整。 @param concurrencyLevelLevel并发更新线程的估计数量。该实现执行内部大小调整以尝试容纳这么多线程。如果初始容量为负,或者负载系数或concurrencyLevel为非正值,则@throws IllegalArgumentException */
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 参数检验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
// >= concurrencyLevel 2的幂次方数(例如 concurrencyLevel=16,sshift 则为 4 )
int sshift = 0;
// >= concurrencyLevel 2的幂数(例如 concurrencyLevel=16,ssize 则为 16)
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
// ssize 右移两位 即 ssize = ssize * 2
ssize <<= 1;
}
// 32 - 4 = 28 (段位移)
this.segmentShift = 32 - sshift;
// 16 - 1 = 15 (段 mask)
this.segmentMask = ssize - 1;

// 计算 segment 段内最小的数组长度,2的幂次方倍
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;

// create segments and segments[0]
// 创建 segments=Segment[ssize] 数组并初始化 segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

小结

  • 根据 initialCapacity, loadFactor, concurrencyLevel 创建 Segment 数组并初始化 Segment[0]

  • 计算出段位移:this.segmentShift = 32 - sshift;

  • 计算出段Mask : this.segmentMask = ssize - 1;

ConcurrentHashMap(Map<? extends K, ? extends V> m)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/**
* Creates a new map with the same mappings as the given map.
* The map is created with a capacity of 1.5 times the number
* of mappings in the given map or 16 (whichever is greater),
* and a default load factor (0.75) and concurrencyLevel (16).
*
* @param m the map
*/
/* 使用与给定map相同的map创建一个新map。创建的映射的容量是给定映射中映射数量的1.5倍或16(最大),默认加载因子(0.75)和concurrencyLevel(16)。map中的@param m */
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}

3、put方法

put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   /**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
/** 将指定的键映射到此表中的指定值。键或值都不能为null。 <p>可以通过使用等于原始键的键调用<tt> get <tt>方法来检索该值。 @param与指定值关联的键key @ param与指定键关联的值@返回与<tt> key <tt>或<tt> null <tt>关联的先前值如果指定的键或值为null,则<tt> key <tt>没有映射@throws NullPointerException */
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
// value 值为空,直接抛出异常
throw new NullPointerException();
// 经过一系列的计算得出 key 的 hash 值 (为了更好的均匀散列表的下标)
int hash = hash(key);
// 计算 key 在 Segment[] 数组中下标所在的位置
int j = (hash >>> segmentShift) & segmentMask;
// 如果 Segment[] 数组为空则创建一个 Segment<> 元素,若不为空则直接 put
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

ensureSegment(int k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/**
* Returns the segment for the given index, creating it and
* recording in segment table (via CAS) if not already present.
*
* @param k the index
* @return the segment
*/
/* 返回给定索引的段,创建它并记录在段表中(通过CAS)(如果尚不存在)。 @param k索引 @返回 段 */
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// raw offset 原始偏移
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 根据原始偏移获取指定段的值,如果为空
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 复制一份和segment 0一样的segment (使用 segment0 作为原型)
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length; // 段数组大小
float lf = proto.loadFactor; // 加载因子
int threshold = (int)(cap * lf); // 阈值
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次根据原始偏移量获取指定段的值,如果为空
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
// 根据已有属性创建段 Segment s
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 再次根据原始偏移量获取指定段的值,为空则循环
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 利用 cas 进行赋值
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

小结

  • 如果指定段 segments[j] 为空,则复制 segments[0] 到指定段 segments[j]
  • UNSAFE.compareAndSwapObject 原子性操作保证线程安全 (利用了CAS)

4、Segment内置方法解析

put(K key, int hash, V value, boolean onlyIfAbsent)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// tryLock() 尝试获取锁,获取不到执行 scanAndLockForPut 方法
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 获取当前 hash 值在改 tab 中的索引
int index = (tab.length - 1) & hash;
// 获取执行 tab 中的头节点 Entry
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// key 存在
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
// 记录当前值为旧值
oldValue = e.value;
if (!onlyIfAbsent) {
// 如果不是设置的空值替换,则直接替换原有值。
e.value = value;
++modCount;
}
break;
}
// 转向下一个节点
e = e.next;
}
else {
if (node != null)
// 头插法
node.setNext(first);
else
// 不存在则创建节点
node = new HashEntry<K,V>(hash, key, value, first);
// 键值对的数量 +1
int c = count + 1;
// 键值对数量达到阈值,(并且数组长度要小于最大定义长度)
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 扩容
rehash(node);
else
// 为超过阈值,直接将 node 放在指定位置
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
// 修改成功,返回原值
return oldValue;
}

scanAndLockForPut(K key, int hash, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
   /**
* Scans for a node containing given key while trying to
* acquire lock, creating and returning one if not found. Upon
* return, guarantees that lock is held. UNlike in most
* methods, calls to method equals are not screened: Since
* traversal speed doesn't matter, we might as well help warm
* up the associated code and accesses as well.
*
* @return a new node if key not found, else null
*/
/**
在尝试获取锁的同时扫描包含给定 key 的节点,如果找不到则创建并返回一个节点。返回时,保证锁定被保持。与大多数方法不同,不筛选对方法等于的调用:由于遍历速度并不重要,因此我们也可能会帮助预热关联的代码和访问。
@ reurn 如果找不到 key ,则返回一个新节点,否则返回null
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// Gets the table entry for the given segment and hash
// 根据 segment 和 hash 获取 entry
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
// 重试次数
int retries = -1; // negative while locating node
// 循环尝试获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 创建节点
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
// 头结点就是
retries = 0;
else
// 转向下一个节点
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 尝试了MAX_SCAN_RETRIES次还没拿到锁,
// TODO
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
// 验证头结点是否改变,如果改变,retries重置,从头开始
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

rehash(HashEntry<K,V> node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
/* 双倍扩容,将原有数据放入新 table 中 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
/** 将每个列表中的节点重新分类为新表。因为我们使用的是2的幂次展开,所以每个bin中的元素必须保持相同的索引或以2个偏移量的幂移动。通过捕获旧节点因为其下一个字段不会更改而可以重复使用的情况,我们消除了不必要的节点创建。从统计上讲,在默认阈值下,当表加倍时,仅其中的六分之一需要克隆。一旦它们被并发遍历表中的任何读取器线程不再引用,它们替换的节点将立即被垃圾回收。条目访问使用纯数组索引,因为它们后面是易失表写入。 */
// 旧数组的引用
HashEntry<K,V>[] oldTable = table;
// 旧数组的长度
int oldCapacity = oldTable.length;
// 新数组的长度
int newCapacity = oldCapacity << 1;
// 新数组的阈值
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
// 遍历就数组
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算旧数组值在新数组中的位置
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 单节点直接放到新的数组上
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot(重用同一插槽中的连续序列)
// 拷贝第一个节点到新的数组节点上(方便后面头插法)
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes(拷贝剩余节点)
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
// 添加新的节点(链表头插入方式)
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

5、图解流程

1610410777381

参考:https://juejin.cn/post/6844903520957644808#heading-4


----------- 本文结束啦感谢您阅读 -----------