使用Java如何實現ConcurrentHashMap#put并發容器?針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
jdk1.7.0_79
HashMap可以說是每個Java程序員用的最多的數據結構之一了,無處不見它的身影。關于HashMap,通常也能說出它不是線程安全的。這篇文章要提到的是在多線程并發環境下的HashMap——ConcurrentHashMap,顯然它必然是線程安全的,同樣我們不可避免的要討論散列表,以及它是如何實現線程安全的,它的效率又是怎樣的,因為對于映射容器還有一個Hashtable也是線程安全的但它似乎只出現在筆試、面試題里,在現實編碼中它已經基本被遺棄。
關于HashMap的線程不安全,在多線程并發環境下它所帶來的影響絕不僅僅是出現臟數據等數據不一致的情況,嚴重的是它有可能帶來程序死循環,這可能有點不可思議,但確實在不久前的項目里同事有遇到了CPU100%滿負荷運行,分析結果是在多線程環境下HashMap導致程序死循環。對于Hashtable,查看其源碼可知,Hashtable保證線程安全的方式就是利用synchronized關鍵字,這樣會導致效率低下,但對于ConcurrentHashMap則采用了不同的線程安全保證方式——分段鎖。它不像Hashtable那樣將整個table鎖住而是將數組元素分段加鎖,如果線程1訪問的元素在分段segment1,而線程2訪問的元素在分段segment2,則它們互不影響可以同時進行操作。如果合理的進行分段就是其關鍵問題。
ConcurrentHashMap和HashMap的結果基本一致,同樣也是Entry作為存放數據的對象,另外一個就是上面提到的分段鎖——Segment。它繼承自ReentrantLock(關于ReentrantLock,可參考《5.Lock接口及其實現ReentrantLock》),故它具有ReentrantLock一切特性——可重入,獨占等。
ConcurrentHashMap的結構圖如下所示:

可以看到相比較于HashMap,ConcurrentHashMap在Entry數組之上是Segment,這個就是我們上面提到的分段鎖,合理的確定分段數就能更好的提高并發效率,我們來看ConcurrentHashMap是如何確定分段數的。
ConcurrentHashMap的初始化時通過其構造函數public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成的,若在不指定各參數的情況下,初始容量initialCapacity=DAFAULT_INITIAL_CAPACITY=16,負載因子loadFactor=DEFAULT_LOAD_FACTOR=0.75f,并發等級concurrencyLevel=DEFAULT_CONCURRENCY_LEVEL=16,前兩者和HashMap相同。至于負載因子表示一個散列表的空間的使用程度,initialCapacity(總容量) * loadFactor(負載因子) = 數據量,有此公式可知,若負載因子越大,則散列表的裝填程度越高,也就是能容納更多的元素,但這樣元素就多,鏈表就大,此時索引效率就會降低。若負載因子越小,則相反,索引效率就會高,換來的代價就是浪費的空間越多。并發等級它表示估計最多有多少個線程來共同修改這個Map,稍后可以看到它和segment數組相關,segment數組的長度就是通過concurrencyLevel計算得出。
//以默認參數為例initalCapacity=16,loadFactor=0.75,concurrencyLevel=16
public ConcurrentHashMap(int initalCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;//segment數組長度
while (ssize < concurrencyLevel) {
++sshift;
ssize <= 1;
}//經過ssize左移4位后,ssize=16,ssift=4
/*segmentShift用于參與散列運算的位數,segmentMask是散列運算的掩碼,這里有關的散列函數運算和HashMap有類似之處*/
this.segmentShift = 32 – ssift;//段偏移量segmentShift=28
this.segmentMask = ssize – 1;//段掩碼segmentMask=15(1111)
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;//c = 1
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;//MIN_SEGMENT_TABLE_CAPACITY=2
while (cap < c)//cap = 2, c = 1,false
cap <<= 1;//cap是segment里HashEntry數組的長度,最小為2
/*創建segments數組和segment[0]*/
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[]) new HashEntry[cap]);//參數意為:負載因子=1,數據容量=(int)(2 * 0.75)=1,總容量=2,故每個Segment的HashEntry總容量為2,實際數據容量為1
Segment<K,V> ss = (Segment<K,V>[])new Segment[ssize];//segments數組大小為16
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}以上就是整個初始化過程,主要是初始化segments的長度大小以及通過負載因子確定每個Segment的容量大小。確定好Segment過后,接下來的重點就是如何準確定位Segment。定位Segment的方法就是通過散列函數來定位,先通過hash方法對元素進行二次散列,這個算法較為復雜,其目的只有一個——減少散列沖突,使元素能均勻分布在不同的Segment上,提高容器的存取效率。
我們通過最直觀最常用的put方法來觀察ConcurrentHashMap是如何通過key值計算hash值在定位到Segment的:
//ConcurrentHashMap#put
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);//根據散列函數,計算出key值的散列值
int j = (hash >>> segmentShift) & segmentMask;//這個操作就是定位Segment的數組下標,jdk1.7之前是segmentFor返回Segment,1.7之后直接就取消了這個方法,直接計算數組下標,然后通過偏移量底層操作獲取Segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);//通過便宜量定位不到就調用ensureSegment方法定位Segment
return s.put(key, hash, value, false);
}Segment.put方法就是將鍵、值構造為Entry節點加入到對應的Segment段里,如果段中已經有元素(即表示兩個key鍵值的hash值重復)則將最新加入的放到鏈表的頭),整個過程必然是加鎖安全的。
不妨繼續深入Segment.put方法:
//Segment#put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//非阻塞獲取鎖,獲取成功node=null,失敗
V oldValue;
try {
HashEntry<K,V>[] tab = table;//Segment對應的HashEntry數組長度
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);//獲取HashEntry數組的第一個值
for (HashEntry<K,V> e = first;;) {
if (e != null) {//HashEntry數組已經存在值
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {//key值和hash值都相等,則直接替換舊值
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;//不是同一個值則繼續遍歷,直到找到相等的key值或者為null的HashEntry數組元素
}
else {//HashEntry數組中的某個位置元素為null
if (node != null)
node.setNext(first);//將新加入節點(key)的next引用指向HashEntry數組第一個元素
else//已經獲取到了Segment鎖
node = new HashEntry<K,V>(hash, key, value, first)
int c = count + 1;
if (c > threshold && tab.lenth < MAXIUM_CAPACITY)//插入前先判斷是否擴容,ConcurrentHashMap擴容與HashMap不同,ConcurrentHashMap只擴Segment的容量,HashMap則是整個擴容
rehash(node);
else
setEntryAt(tab, index, node);//設置為頭節點
++modCount;//總容量
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}上面大致就是ConcurrentHashMap加入一個元素的過程,需要明白的就是ConcurrentHashMap分段鎖的概念。在JDK1.6中定位Segment較為簡單,直接計算出Segment數組下標后就返回具體的Segment,而JDK1.7則通過偏移量來計算,算出為空時,還有一次檢查獲取Segment,猜測是1.7使用底層native是為了提高效率,JDK1.8的ConcurrentHashMap又有不同,暫未深入研究,它的數據結果似乎變成了紅黑樹。
有關ConcurrentHashMap的get方法不再分析,過程總結為一句話:根據key值計算出hash值,根據hash值計算出對應的Segment,再在Segment下的HashEntry鏈表遍歷查找。
關于使用Java如何實現ConcurrentHashMap#put并發容器問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。