「JUC高併發容器」JDK1.8版本ConcurrentHashMap的核心原理與原始碼

目錄

原理解析

JDK 1。8版本ConcurrentHashMap的主要成員屬性

hash儲存圖

put()方法

get()方法

1、原理解析

JDK 1。8版本的ConcurrentHashMap中透過一個Node[]陣列table來儲存新增到雜湊表中的桶,而在同一個Bucket位置是透過連結串列和紅黑樹的形式來儲存的。但是陣列table是懶載入的,只有在第一次新增元素的時候才會初始化。

2、JDK 1。8版本ConcurrentHashMap的主要成員屬性

private static final int MAXIMUM_CAPACITY = 1 << 30; /** * The default initial table capacity。 Must be a power of 2 * (i。e。, at least 1) and at most MAXIMUM_CAPACITY。 */ private static final int DEFAULT_CAPACITY = 16; /** * The largest possible (non-power of two) array size。 * Needed by toArray and related methods。 */ static final int MAX_ARRAY_SIZE = Integer。MAX_VALUE - 8; /** * The default concurrency level for this table。 Unused but * defined for compatibility with previous versions of this class。 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * The load factor for this table。 Overrides of this value in * constructors affect only the initial table capacity。 The * actual floating point value isn‘t normally used —— it is * simpler to use expressions such as {@code n - (n >>> 2)} for * the associated resizing threshold。 */ private static final float LOAD_FACTOR = 0。75f; /** * The bin count threshold for using a tree rather than list for a * bin。 Bins are converted to trees when adding an element to a * bin with at least this many nodes。 The value must be greater * than 2, and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage。 */ static final int TREEIFY_THRESHOLD = 8; /** * The bin count threshold for untreeifying a (split) bin during a * resize operation。 Should be less than TREEIFY_THRESHOLD, and at * most 6 to mesh with shrinkage detection under removal。 */ static final int UNTREEIFY_THRESHOLD = 6; /** * The smallest table capacity for which bins may be treeified。 * (Otherwise the table is resized if too many nodes in a bin。) * The value should be at least 4 * TREEIFY_THRESHOLD to avoid * conflicts between resizing and treeification thresholds。 */ static final int MIN_TREEIFY_CAPACITY = 64; /** * Minimum number of rebinnings per transfer step。 Ranges are * subdivided to allow multiple resizer threads。 This value * serves as a lower bound to avoid resizers encountering * excessive memory contention。 The value should be at least * DEFAULT_CAPACITY。 */ private static final int MIN_TRANSFER_STRIDE = 16; /** * The number of bits used for generation stamp in sizeCtl。 * Must be at least 6 for 32bit arrays。 */ private static int RESIZE_STAMP_BITS = 16; /** * The maximum number of threads that can help resize。 * Must fit in 32 - RESIZE_STAMP_BITS bits。 */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** * The bit shift for recording size stamp in sizeCtl。 */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields。 See above for explanation。 */ static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

3、hash儲存圖

「JUC高併發容器」JDK1.8版本ConcurrentHashMap的核心原理與原始碼

4、put()方法

//設定元素public V put(K key, V value) { return putVal(key, value, false);}

/** * @onlyIfAbsent 表示是否只在元素不存在的時候新增,元素存在則不新增 * 此方法是一個泛型方法,K,V為泛型引數,返回值也是範型V */final V putVal(K key, V value, boolean onlyIfAbsent) { // 不允許key或者value為null if (key == null || value == null) throw new NullPointerException(); // 計算key的雜湊值 int hash = spread(key。hashCode()); int binCount = 0; //自旋:併發情況下,保證執行緒安全。table是桶陣列,存放每個桶,為2的平方,懶載入初始化。 for (Node[] tab = table;;) { Node f; int n, i, fh; //懶載入初始化 if (tab == null || (n = tab。length) == 0) tab = initTable(); //透過hash與節點陣列與操作獲得偏移量,判斷此偏移量是否有值 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //沒有值,則進行CAS新增 if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } //節點移動或者擴容 else if ((fh = f。hash) == MOVED) tab = helpTransfer(tab, f); //新增節點 else { V oldVal = null; //為什麼不用ReentrantLock顯式鎖呢?如果為每一個桶都建立一個ReentrantLock例項,就會帶來大量的記憶體消耗, //反過來,使用CAS自旋(簡單輕量級鎖)、synchronized偏向鎖或輕量級鎖,記憶體消耗的增加會微乎其微。 synchronized (f) { if (tabAt(tab, i) == f) { //連結串列賦值 if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; //透過相同的hash找到節點並且賦值,然後退出迴圈 if (e。hash == hash && ((ek = e。key) == key || (ek != null && key。equals(ek)))) { oldVal = e。val; //如果有此引數,則只在不存在的時候新增 if (!onlyIfAbsent) e。val = value; break; } Node pred = e; //走到這裡,表示該空間已被佔用,這時node形成連結串列依次往後查詢,有空則賦值 if ((e = e。next) == null) { pred。next = new Node(hash, key, value, null); break; } } } //如果節點已經從連結串列節點轉化成紅黑樹節點,則透過新增紅黑樹節點的方式賦值 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f)。putTreeVal(hash, key, value)) != null) { oldVal = p。val; if (!onlyIfAbsent) p。val = value; } } } } //判斷節點元素超過8則從連結串列轉換成紅黑樹 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } }}addCount(1L, binCount);return null;}

spread()方法

考慮到2進位制32位帶符號的int表值範圍從-2147483648到2147483648,前後加起來大概40億的對映空間。一個40億長度的陣列,記憶體是放不下的!

所以雜湊值一般只會用到後面的位,但是如果只取到最後幾位的話,碰撞會很嚴重。於是就有了“擾動函式”——右位移16位,正好是32bit的一半,自己的高半區和低半區做異或,就是為了混合原始雜湊碼的高位和低位,以此來加大低位的隨機性。而且混合後的低位摻雜了高位的部分特徵,這樣高位的資訊也被變相保留下來。

static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS;}

initTable()初始化

private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab。length == 0) { //如果其他節點正在初始化或者擴容情況下,讓出CPU時間片,繼續自旋 if ((sc = sizeCtl) < 0) Thread。yield(); // lost initialization race; just spin //透過CAS保證只有一個執行緒將sc狀態設定為-1 else if (U。compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //如果是陣列為空,則初始化節點陣列 if ((tab = table) == null || tab。length == 0) { //預設容量16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings(“unchecked”) Node[] nt = (Node[])new Node<?,?>[n]; table = tab = nt; //n無符號右移2位,相當於/4,記錄sc=3/4的容量 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }

tabAt()查詢在記憶體地址中節點是否存在

/* * Volatile access methods are used for table elements as well as * elements of in-progress next table while resizing。 All uses of * the tab arguments must be null checked by callers。 All callers * also paranoically precheck that tab’s length is not zero (or an * equivalent check), thus ensuring that any index argument taking * the form of a hash value anded with (length - 1) is a valid * index。 Note that, to be correct wrt arbitrary concurrency * errors by users, these checks must operate on local variables, * which accounts for some odd-looking inline assignments below。 * Note that calls to setTabAt always occur within locked regions, * and so in principle require only release ordering, not * full volatile semantics, but are currently coded as volatile * writes to be conservative。 */ @SuppressWarnings(“unchecked”) static final Node tabAt(Node[] tab, int i) { return (Node)U。getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }

helpTransfer()判斷節點移動或者擴容

ConcurrentHashMap鬼斧神工,併發新增元素時,如果正在擴容,其他執行緒會幫助擴容,也就是多執行緒擴容。

第一次新增元素時,預設初期長度為16,當往table中繼續新增元素時,透過Hash值跟陣列長度取餘來決定放在陣列的哪個Bucket位置,如果出現放在同一個位置,就優先以連結串列的形式存放,在同一個位置的個數達到了8個以上,如果陣列的長度還小於64,就會擴容陣列。如果陣列的長度大於等於64,就會將該節點的連結串列轉換成樹。

透過擴容陣列的方式來把這些節點分散開。然後將這些元素複製到擴容後的新陣列中,同一個Bucket中的元素透過Hash值的陣列長度位來重新確定位置,可能還是放在原來的位置,也可能放到新的位置。而且,在擴容完成之後,如果之前某個節點是樹,但是現在該節點的“Key-Value對”數又小於等於6個,就會將該樹轉為連結串列。

什麼時候擴容呢?當前容量超過閾值,也就是連結串列中的元素個數超過預設設定(8個)時,如果陣列table的大小還未超過64,此時就進行陣列的擴容,如果超過就將連結串列轉化成紅黑樹。

final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f)。nextTable) != null) { int rs = resizeStamp(tab。length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //CAS獲得容量的控制權,則進行節點擴容轉移 if (U。compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab;}return table;}

transfer()將節點移動或者複製到新的節點陣列中

table資料轉移到nextTable。擴容操作的核心在於資料的轉移,把舊陣列中的資料遷移到新的陣列。ConcurrentHashMap精華的部分是它可以利用多執行緒來進行協同擴容,簡單來說,它把table陣列當作多個執行緒之間共享的任務佇列,然後透過維護一個指標來劃分每個執行緒鎖負責的區間,每個執行緒透過區間逆向遍歷來實現擴容,一個已經遷移完的Bucket會被替換為一個ForwardingNode節點,標記當前Bucket已經被其他執行緒遷移完了。

private final void transfer(Node[] tab, Node[] nextTab) { int n = tab。length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings(“unchecked”) Node[] nt = (Node[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer。MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab。length; ForwardingNode fwd = new ForwardingNode(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; while (advance) { int nextIndex, nextBound; if (——i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U。compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U。compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f。hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; if (fh >= 0) { int runBit = fh & n; Node lastRun = f; for (Node p = f。next; p != null; p = p。next) { int b = p。hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p。next) { int ph = p。hash; K pk = p。key; V pv = p。val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t。first; e != null; e = e。next) { int h = e。hash; TreeNode p = new TreeNode (h, e。key, e。val, null, null); if ((h & n) == 0) { if ((p。prev = loTail) == null) lo = p; else loTail。next = p; loTail = p; ++lc; } else { if ((p。prev = hiTail) == null) hi = p; else hiTail。next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

putTreeVal()紅黑樹新增節點

final TreeNode putTreeVal(int h, K k, V v) { Class<?> kc = null; boolean searched = false; for (TreeNode p = root;;) { int dir, ph; K pk; if (p == null) { first = root = new TreeNode(h, k, v, null, null); break; } else if ((ph = p。hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((pk = p。key) == k || (pk != null && k。equals(pk))) return p; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { if (!searched) { TreeNode q, ch; searched = true; if (((ch = p。left) != null && (q = ch。findTreeNode(h, k, kc)) != null) || ((ch = p。right) != null && (q = ch。findTreeNode(h, k, kc)) != null)) return q; } dir = tieBreakOrder(k, pk); } TreeNode xp = p; if ((p = (dir <= 0) ? p。left : p。right) == null) { TreeNode x, f = first; first = x = new TreeNode(h, k, v, f, xp); if (f != null) f。prev = x; if (dir <= 0) xp。left = x; else xp。right = x; if (!xp。red) x。red = true; else { lockRoot(); try { root = balanceInsertion(root, x); } finally { unlockRoot(); } } break; } } assert checkInvariants(root); return null;}

treeifyBin()將連結串列節點轉化成紅黑樹

private final void treeifyBin(Node[] tab, int index) { Node b; int n, sc; if (tab != null) { if ((n = tab。length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b。hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e。next) { TreeNode p = new TreeNode(e。hash, e。key, e。val, null, null); if ((p。prev = tl) == null) hd = p; else tl。next = p; tl = p; } setTabAt(tab, index, new TreeBin(hd)); } } } } }

addCount()設定元素成功計數

/** * 如果表太小並且沒有準備好調整和初始化時,新增到count計數。 * 如果已經在調整大小,在工作可用的情況下幫助執行轉移。 * 在轉移後重新檢查佔用情況,看看是否需要再次調整,因為調整是滯後的增加。 */private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U。compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as。length - 1) < 0 || (a = as[ThreadLocalRandom。getProbe() & m]) == null || !(uncontended = U。compareAndSwapLong(a, CELLVALUE, v = a。value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab。length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U。compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U。compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }}

5、get()方法,獲得元素

public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; //計算元素key的hash儲存位置 int h = spread(key。hashCode()); if ((tab = table) != null && (n = tab。length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //查詢當前節點 if ((eh = e。hash) == h) { if ((ek = e。key) == key || (ek != null && key。equals(ek))) return e。val; } //從當前節點的中找到元素 else if (eh < 0) return (p = e。find(h, key)) != null ? p。val : null; //從當前節點的下一個節點查詢元素 while ((e = e。next) != null) { if (e。hash == h && ((ek = e。key) == key || (ek != null && key。equals(ek)))) return e。val; } } return null;}

find()查詢節點種元素值

Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e。hash == h && ((ek = e。key) == k || (ek != null && k。equals(ek)))) return e; } while ((e = e。next) != null); } return null;}

參考資料

JDK1。8ConcurrentHashMap原始碼

《Java高併發核心程式設計。卷2,多執行緒、鎖、JMM、JUC、高併發設計模式》