阿里一面面經詳解:kafka訊息重複消費場景,你怎麼看待996?

1。怎麼解決訊息佇列重複消費

2。MQ為什麼能單機抗很高的併發量

3。Netty裡序列化的方式

4。如果說想提高效能 用什麼序列化方案?

5。Netty執行緒池:nioeventloopgroup 序列無鎖化 thread和selector的封裝

6。執行緒池的設定引數

7。執行緒數量怎麼定的:n+1 和 2*n+1

8。有沒有測過不同執行緒數量對於效能的影響

9。你的web專案有幾個表 分別是什麼。

10。分散式事務實現方法?

11。update是原子性的麼?

12。update會不會死鎖

13。hashmap會不會死鎖

14。concurrentHashMap:介紹一波1。7和1。8的結構

15。建索引的規範或者標準:

16。mysql的檔案系統,是怎麼找資料的?

17。資料量特別大的時候 mysql會怎麼去做

28。mysql是怎麼跟外部建立連線的

19。redis單一get和set操作是不是原子的

20。為什麼redis沒有做成多執行緒的

21。你對996怎麼看?你對沒有技術含量的工作怎麼看

1。

怎麼解決訊息佇列重複消費

kafka訊息重複消費場景

kafka實際上有個offset的概念,就是每個訊息寫進去,都有一個offset,代表他的序號,然後consumer消費了資料之後,每隔一段時間,會把自己消費過的訊息的offset提交一下,代表我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。

但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接kill程序了,再重啟。這會導致consumer有些訊息處理了,但是沒來得及提交offset,尷尬了。重啟之後,少數訊息會再次消費一次。

阿里一面面經詳解:kafka訊息重複消費場景,你怎麼看待996?

如何保證訊息重複消費後的冪等性

其實重複消費不可怕,可怕的是你沒考慮到重複消費之後,怎麼保證冪等性。

阿里一面面經詳解:kafka訊息重複消費場景,你怎麼看待996?

舉個例子吧。假設你有個系統,消費一條往資料庫裡插入一條,要是你一個訊息重複兩次,你不就插入了兩條,這資料不就錯了?但是你要是消費到第二次的時候,自己判斷一下已經消費過了,直接扔了,不就保留了一條資料?

一條資料重複出現兩次,資料庫裡就只有一條資料,這就保證了系統的冪等性

冪等性,我通俗點說,就一個數據,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。

那所以第二個問題來了,怎麼保證訊息佇列消費的冪等性?

其實還是得結合業務來思考,我這裡給幾個思路:

(1)比如你拿個資料要寫庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,update一下好吧

(2)比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性

(3)比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者傳送每條資料的時候,裡面加一個全域性唯一的id,類似訂單id之類的東西,然後你這裡消費到了之後,先根據這個id去比如redis裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個id寫redis。如果消費過了,那你就別處理了,保證別重複處理相同的訊息即可。

還有比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條,我們之前線上系統就有這個問題,就是拿到資料的時候,每次重啟可能會有重複,因為kafka消費者還沒來得及提交offset,重複資料拿到了以後我們插入的時候,因為有唯一鍵約束了,所以重複資料只會插入報錯,不會導致資料庫中出現髒資料

13。

hashmap會不會死鎖

1、首先我們需要簡單地瞭解一下HashMap資料結構

HashMap通常會用一個指標陣列(假設為table[])來做分散所有的key,當一個key被加入時,會透過Hash算

法透過key算出這個陣列的下標i,然後就把這個插到table[i]中,如果有兩個不同的key被算了。

但有時候兩個key算出的下標會是一個i,那麼就叫衝突,又叫碰撞,這樣會在table[i]上形成一個連結串列。所以

如果連結串列過多或過長,查詢演算法則會變成低效能的連結串列遍歷,這是Hash表的缺陷。

我們都知道HashMap初始容量大小為16,一般來說,Hash表這個容器當有資料要插入時,都會檢查容量有沒有超過設定的thredhold,如果超過,需要增大Hash表的尺寸,但是這樣一來,整個Hash表裡的元素都需要被重算一遍。這叫rehash,這個成本相當的大。具體大家可以看看JDK原始碼

2、現在來討論死鎖產生的原因

HashMap是非執行緒安全,死鎖一般都是產生於併發情況下。我們假設有二個程序T1、T2,HashMap容量為2,T1執行緒放入key A、B、C、D、E。在T1執行緒中A、B、C Hash值相同,於是形成一個連結,假設為A->C->B,而D、E Hash值不同,於是容量不足,需要新建一個更大尺寸的hash表,然後把資料從老的Hash表中

遷移到新的Hash表中(refresh)。這時T2程序闖進來了,T1暫時掛起,T2程序也準備放入新的key,這時也

發現容量不足,也refresh一把。refresh之後原來的連結串列結構假設為C->A,之後T1程序繼續執行,連結結構

為A->C,這時就形成A。next=B,B。next=A的環形連結串列。一旦取值進入這個環形連結串列就會陷入死迴圈。

3、替代方案

使用ConcurrentHashMap進行替代,ConcurrentHashMap是一個執行緒安全的Hash Table。可能有人會使用HashTable。當然HashTable也是執行緒安全,但HashTable鎖定的是整個Hash表,效率相對比較低。而ConcurrentHashMap可以做到讀取資料不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持地儘量地小

14。

concurrentHashMap:介紹一波1.7和1.8的結構

ConcurrentHashMap 與HashMap和Hashtable 最大的不同在於:put和 get 兩次Hash到達指定的HashEntry,第一次hash到達Segment,第二次到達Segment裡面的Entry,然後在遍歷entry連結串列

(1) 從1。7到1。8版本,由於HashEntry從連結串列 變成了紅黑樹所以 concurrentHashMap的時間複雜度從O(n)到O(log(n))

(2) HashEntry最小的容量為2

(3)Segment的初始化容量是16;

(4)HashEntry在1。8中稱為Node,連結串列轉紅黑樹的值是8 ,當Node連結串列的節點數大於8時Node會自動轉化為TreeNode,會轉換成紅黑樹的結構

總結與思考

其實可以看出JDK1。8版本的ConcurrentHashMap的資料結構已經接近HashMap,相對而言,ConcurrentHashMap只是增加了同步的操作來控制併發,從JDK1。7版本的ReentrantLock+Segment+HashEntry,到JDK1。8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結如下思考

JDK1。8的實現降低鎖的粒度,JDK1。7版本鎖的粒度是基於Segment的,包含多個HashEntry,而JDK1。8鎖的粒度就是HashEntry(首節點)

JDK1。8版本的資料結構變得更加簡單,使得操作也更加清晰流暢,因為已經使用synchronized來進行同步,所以不需要分段鎖的概念,也就不需要Segment這種資料結構了,由於粒度的降低,實現的複雜度也增加了

JDK1。8使用紅黑樹來最佳化連結串列,基於長度很長的連結串列的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,代替一定閾值的連結串列,這樣形成一個最佳拍檔

JDK1。8為什麼使用內建鎖synchronized來代替重入鎖ReentrantLock,我覺得有以下幾點

因為粒度降低了,在相對而言的低粒度加鎖方式,synchronized並不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能透過Condition來控制各個低粒度的邊界,更加的靈活,而在低粒度中,Condition的優勢就沒有了

JVM的開發團隊從來都沒有放棄synchronized,而且基於JVM的synchronized最佳化空間更大,使用內嵌的關鍵字比使用API更加自然

在大量的資料操作下,對於JVM的記憶體壓力,基於API的ReentrantLock會開銷更多的記憶體,雖然不是瓶頸,但是也是一個選擇依據

以前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操作,就是rehash,這個會重新將原陣列的內容重新hash到新的擴容陣列中,在多執行緒的環境下,存在同時其他的元素也在進行put操作,如果hash值相同,可能出現同時在同一陣列下用連結串列表示,造成閉環,導致在get時會出現死迴圈,所以HashMap是執行緒不安全的。

我們來了解另一個鍵值儲存集合HashTable,它是執行緒安全的,它在所有涉及到多執行緒操作的都加上了synchronized關鍵字來鎖住整個table,這就意味著所有的執行緒都在競爭一把鎖,在多執行緒的環境下,它是安全的,但是無疑是效率低下的。

其實HashTable有很多的最佳化空間,鎖住整個table這麼粗暴的方法可以變相的柔和點,比如在多執行緒的環境下,對不同的資料集進行操作時其實根本就不需要去競爭一個鎖,因為他們不同hash值,不會因為rehash造成執行緒不安全,所以互不影響,這就是鎖分離技術,將鎖的粒度降低,利用多個鎖來控制多個小的table,這就是這篇文章的主角ConcurrentHashMap JDK1。7版本的核心思想

ConcurrentHashMap

在JDK1。7版本中,ConcurrentHashMap的資料結構是由一個Segment陣列和多個HashEntry組成,如下圖所示:

阿里一面面經詳解:kafka訊息重複消費場景,你怎麼看待996?

Segment陣列的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每一個Segment元素儲存的是HashEntry陣列+連結串列,這個和HashMap的資料儲存結構一樣

初始化

ConcurrentHashMap的初始化是會透過位與運算來初始化Segment的大小,用ssize來表示,原始碼如下所示

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;private void writeObject(java。io。ObjectOutputStream s)throws java。io。IOException { // For serialization compatibility // Emulate segment calculation from previous version of this class int sshift = 0; int ssize = 1; while (ssize < DEFAULT_CONCURRENCY_LEVEL) { ++sshift; ssize <<= 1; } int segmentShift = 32 - sshift; int segmentMask = ssize - 1;

由此可以看出:因為ssize用位於運算來計算(ssize <<=1),所以Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,當然concurrencyLevel最大隻能用16位的二進位制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize預設為 DEFAULT_CONCURRENCY_LEVEL =16(阿里面試官曾問過)

每一個Segment元素下的HashEntry的初始化也是按照位於運算來計算,用cap來表示,如下:

int cap = 1;while (cap < c) cap <<= 1

如上所示,HashEntr

cap <<= 1

y大小的計算也是2的N次方(cap <<=1), cap的初始值為1,所以HashEntry最小的容量為2

put操作

對於ConcurrentHashMap的資料插入,這裡要進行兩次Hash去定位資料的儲存位置

static class Segment extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; final float loadFactor; Segment(float lf) { this。loadFactor = lf; } }

從上Segment的繼承體系可以看出,Segment實現了ReentrantLock,也就帶有鎖的功能,當執行put操作時,會進行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化,即透過CAS操作進行賦值,然後進行第二次hash操作,找到相應的HashEntry的位置,這裡會利用繼承過來的鎖的特性,在將資料插入指定的HashEntry位置時(連結串列的尾端),會透過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,如果獲取成功就直接插入相應的位置,如果已經有執行緒獲取該Segment的鎖,那當前執行緒會以自旋的方式去繼續的呼叫tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒(美團面試官問道的,多個執行緒一起put時候,currentHashMap如何操作)

get操作

ConcurrentHashMap的get操作跟HashMap類似,只是ConcurrentHashMap第一次需要經過一次hash定位到Segment的位置,然後再hash定位到指定的HashEntry,遍歷該HashEntry下的連結串列進行對比,成功就返回,不成功就返回null

size操作

計算ConcurrentHashMap的元素大小是一個有趣的問題,因為他是併發操作的,就是在你計算size的時候,他還在併發的插入資料,可能會導致你計算出來的size和你實際的size有相差(在你return size的時候,插入了多個數據),要解決這個問題,JDK1。7版本用兩種方案

try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments。length; ++j) ensureSegment(j)。lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments。length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg。modCount; int c = seg。count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } }finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments。length; ++j) segmentAt(segments, j)。unlock(); }}

1、第一種方案他會使用不加鎖的模式去嘗試多次計算ConcurrentHashMap的size,最多三次,比較前後兩次計算的結果,結果一致就認為當前沒有元素加入,計算的結果是準確的

2、第二種方案是如果第一種方案不符合,他就會給每個Segment加上鎖,然後計算ConcurrentHashMap的size返回(美團面試官的問題,多個執行緒下如何確定size)

JDK1.8的實現

改進一:取消segments欄位,直接採用transient volatile HashEntry[] table儲存資料,採用table陣列元素作為鎖,從而實現了對每一行資料進行加鎖,進一步減少併發衝突的機率。

改進二:將原先table陣列+單向連結串列的資料結構,變更為table陣列+單向連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0。75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1。8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),可以改進效能。

JDK1。8的實現已經摒棄了Segment的概念,而是直接用Node陣列+連結串列+紅黑樹的資料結構來實現,併發控制使用Synchronized和CAS來操作,整個看起來就像是最佳化過且執行緒安全的HashMap,雖然在JDK1。8中還能看到Segment的資料結構,但是已經簡化了屬性,只是為了相容舊版本

阿里一面面經詳解:kafka訊息重複消費場景,你怎麼看待996?

在深入JDK1。8的put和get實現之前要知道一些常量設計和資料結構,這些是構成ConcurrentHashMap實現結構的基礎,下面看一下基本屬性:

// node陣列最大容量:2^30=1073741824private static final int MAXIMUM_CAPACITY = 1 << 30;// 預設初始值,必須是2的幕數private static final int DEFAULT_CAPACITY = 16//陣列可能最大值,需要與toArray()相關方法關聯static final int MAX_ARRAY_SIZE = Integer。MAX_VALUE - 8;//併發級別,遺留下來的,為相容以前的版本private static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 負載因子private static final float LOAD_FACTOR = 0。75f;// 連結串列轉紅黑樹閥值,> 8 連結串列轉換為紅黑樹static final int TREEIFY_THRESHOLD = 8;//樹轉連結串列閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))static final int UNTREEIFY_THRESHOLD = 6;static final int MIN_TREEIFY_CAPACITY = 64;private static final int MIN_TRANSFER_STRIDE = 16;private static int RESIZE_STAMP_BITS = 16;// 2^15-1,help resize的最大執行緒數private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 32-16=16,sizeCtl中記錄size大小的偏移量private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;// forwarding nodes的hash值static final int MOVED = -1;// 樹根節點的hash值static final int TREEBIN = -2;// ReservationNode的hash值static final int RESERVED = -3;// 可用處理器數量static final int NCPU = Runtime。getRuntime()。availableProcessors();//存放node的陣列transient volatile Node[] table;/*控制識別符號,用來控制table的初始化和擴容的操作,不同的值有不同的含義 *當為負數時:-1代表正在初始化,-N代表有N-1個執行緒正在 進行擴容 *當為0時:代表當時的table還沒有被初始化 *當為正數時:表示初始化或者下一次進行擴容的大小private transient volatile int sizeCtl;

基本屬性定義了ConcurrentHashMap的一些邊界以及操作時的一些控制,下面看一些內部的一些結構組成,這些是整個ConcurrentHashMap整個資料結構的核心

Node

Node是ConcurrentHashMap儲存結構的基本單元,繼承於HashMap中的Entry,用於儲存資料,原始碼如下

static class Node implements Map。Entry { final int hash; final K key; volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this。hash = hash; this。key = key; this。val = val; this。next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key。hashCode() ^ val。hashCode(); } public final String toString(){ return key + “=” + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map。Entry<?,?> e; return ((o instanceof Map。Entry) && (k = (e = (Map。Entry<?,?>)o)。getKey()) != null && (v = e。getValue()) != null && (k == key || k。equals(key)) && (v == (u = val) || v。equals(u))); } /** * Virtualized support for map。get(); overridden in subclasses。 */ Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e。hash == h && ((ek = e。key) == k || (ek != null && k。equals(ek)))) return e; } while ((e = e。next) != null); } return null; } }

Node資料結構很簡單,從上可知,就是一個連結串列,但是隻允許對資料進行查詢,不允許進行修改

TreeNode

TreeNode繼承與Node,但是資料結構換成了二叉樹結構,它是紅黑樹的資料的儲存結構,用於紅黑樹中儲存資料,當連結串列的節點數大於8時會轉換成紅黑樹的結構,他就是透過TreeNode作為儲存結構代替Node來轉換成黑紅樹原始碼如下

static final class TreeNode extends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this。parent = parent; } Node find(int h, Object k) { return findTreeNode(h, k, null); } /** * Returns the TreeNode (or null if not found) for the given key * starting at given root。 */ final TreeNode findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { TreeNode p = this; do { int ph, dir; K pk; TreeNode q; TreeNode pl = p。left, pr = p。right; if ((ph = p。hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p。key) == k || (pk != null && k。equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr。findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }

TreeBin

TreeBin從字面含義中可以理解為儲存樹形結構的容器,而樹形結構就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分原始碼結構如下

static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock /** * Tie-breaking utility for ordering insertions when equal * hashCodes and non-comparable。 We don‘t require a total * order, just a consistent insertion rule to maintain * equivalence across rebalancings。 Tie-breaking further than * necessary simplifies testing a bit。 */ static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a。getClass()。getName()。 compareTo(b。getClass()。getName())) == 0) d = (System。identityHashCode(a) <= System。identityHashCode(b) ? -1 : 1); return d; } /** * Creates bin with initial set of nodes headed by b。 */ TreeBin(TreeNode b) { super(TREEBIN, null, null, null); this。first = b; TreeNode r = null; for (TreeNode x = b, next; x != null; x = next) { next = (TreeNode)x。next; x。left = x。right = null; if (r == null) { x。parent = null; x。red = false; r = x; } else { K k = x。key; int h = x。hash; Class<?> kc = null; for (TreeNode p = r;;) { int dir, ph; K pk = p。key; if ((ph = p。hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode xp = p; if ((p = (dir <= 0) ? p。left : p。right) == null) { x。parent = xp; if (dir <= 0) xp。left = x; else xp。right = x; r = balanceInsertion(r, x); break; } } } } this。root = r; assert checkInvariants(root); }

介紹了ConcurrentHashMap主要的屬性與內部的資料結構,現在透過一個簡單的例子以debug的視角看看ConcurrentHashMap的具體操作細節

static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock /** * Tie-breaking utility for ordering insertions when equal * hashCodes and non-comparable。 We don’t require a total * order, just a consistent insertion rule to maintain * equivalence across rebalancings。 Tie-breaking further than * necessary simplifies testing a bit。 */ static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a。getClass()。getName()。 compareTo(b。getClass()。getName())) == 0) d = (System。identityHashCode(a) <= System。identityHashCode(b) ? -1 : 1); return d; } /** * Creates bin with initial set of nodes headed by b。 */ TreeBin(TreeNode b) { super(TREEBIN, null, null, null); this。first = b; TreeNode r = null; for (TreeNode x = b, next; x != null; x = next) { next = (TreeNode)x。next; x。left = x。right = null; if (r == null) { x。parent = null; x。red = false; r = x; } else { K k = x。key; int h = x。hash; Class<?> kc = null; for (TreeNode p = r;;) { int dir, ph; K pk = p。key; if ((ph = p。hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode xp = p; if ((p = (dir <= 0) ? p。left : p。right) == null) { x。parent = xp; if (dir <= 0) xp。left = x; else xp。right = x; r = balanceInsertion(r, x); break; } } } } this。root = r; assert checkInvariants(root); }

我們先透過new ConcurrentHashMap()來進行初始化

public ConcurrentHashMap() {}

由上你會發現ConcurrentHashMap的初始化其實是一個空實現,並沒有做任何事,這裡後面會講到,這也是和其他的集合類有區別的地方,初始化操作並不是在建構函式實現的,而是在put操作中實現,當然ConcurrentHashMap還提供了其他的建構函式,有指定容量大小或者指定負載因子,跟HashMap一樣,這裡就不做介紹了

put操作

在上面的例子中我們新增個人資訊會呼叫put方法,我們來看下

public V put(K key, V value) { return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key。hashCode()); //兩次hash,減少hash衝突,可以均勻分佈 int binCount = 0; for (Node[] tab = table;;) { //對這個table進行迭代 Node f; int n, i, fh; //這裡就是上面構造方法沒有進行初始化,在這裡進行判斷,為null就呼叫initTable進行初始化,屬於懶漢模式初始化 if (tab == null || (n = tab。length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒有資料,就直接無鎖插入 if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f。hash) == MOVED)//如果在進行擴容,則先進行擴容操作 tab = helpTransfer(tab, f); else { V oldVal = null; //如果以上條件都不滿足,那就要進行加鎖操作,也就是存在hash衝突,鎖住連結串列或者紅黑樹的頭結點 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { //表示該節點是連結串列結構 binCount = 1; for (Node e = f;; ++binCount) { K ek; //這裡涉及到相同的key進行put就會覆蓋原先的value if (e。hash == hash && ((ek = e。key) == key || (ek != null && key。equals(ek)))) { oldVal = e。val; if (!onlyIfAbsent) e。val = value; break; } Node pred = e; if ((e = e。next) == null) { //插入連結串列尾部 pred。next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) {//紅黑樹結構 Node p; binCount = 2; //紅黑樹結構旋轉插入 if ((p = ((TreeBin)f)。putTreeVal(hash, key, value)) != null) { oldVal = p。val; if (!onlyIfAbsent) p。val = value; } } } } if (binCount != 0) { //如果連結串列的長度大於8時就會進行紅黑樹的轉換 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount);//統計size,並且檢查是否需要擴容 return null;}

這個put的過程很清晰,對當前的table進行無條件自迴圈直到put成功,可以分成以下六步流程來概述

如果沒有初始化就先呼叫initTable()方法來進行初始化過程

如果沒有hash衝突就直接CAS插入

如果還在進行擴容操作就先進行擴容

如果存在hash衝突,就加鎖來保證執行緒安全,這裡有兩種情況,一種是連結串列形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結構插入,

最後一個如果該連結串列的數量大於閾值8,就要先轉換成黑紅樹的結構,break再一次進入迴圈(阿里面試官問題,預設的連結串列大小,超過了這個值就會轉換為紅黑樹);

如果新增成功就呼叫addCount()方法統計size,並且檢查是否需要擴容

現在我們來對每一步的細節進行原始碼分析,在第一步中,符合條件會進行初始化操作,我們來看看initTable()方法

/** * Initializes table, using the size recorded in sizeCtl。 */private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab。length == 0) {//空的table才能進入初始化操作 if ((sc = sizeCtl) < 0) //sizeCtl<0表示其他執行緒已經在初始化了或者擴容了,掛起當前執行緒 Thread。yield(); // lost initialization race; just spin else if (U。compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS操作SIZECTL為-1,表示初始化狀態 try { if ((tab = table) == null || tab。length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings(“unchecked”) Node[] nt = (Node[])new Node<?,?>[n];//初始化 table = tab = nt; sc = n - (n >>> 2);//記錄下次擴容的大小 } } finally { sizeCtl = sc; } break; } } return tab;}

在第二步中沒有hash衝突就直接呼叫Unsafe的方法CAS插入該元素,進入第三步如果容器正在擴容,則會呼叫helpTransfer()方法幫助擴容,現在我們跟進helpTransfer()方法看看

** *幫助從舊的table的元素複製到新的table中 */final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f)。nextTable) != null) { //新的table nextTba已經存在前提下才能幫助擴容 int rs = resizeStamp(tab。length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U。compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab);//呼叫擴容方法 break; } } return nextTab; } return table;}

其實helpTransfer()方法的目的就是呼叫多個工作執行緒一起幫助進行擴容,這樣的效率就會更高,而不是隻有檢查到要擴容的那個執行緒進行擴容操作,其他執行緒就要等待擴容操作完成才能工作

既然這裡涉及到擴容的操作,我們也一起來看看擴容方法transfer()

private final void transfer(Node[] tab, Node[] nextTab) { int n = tab。length, stride; // 每核處理的量小於16,則強制賦值16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings(“unchecked”) Node[] nt = (Node[])new Node<?,?>[n << 1]; //構建一個nextTable物件,其容量為原來容量的兩倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer。MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab。length; // 連線點指標,用於標誌位(fwd的hash值為-1,fwd。nextTable=nextTab) ForwardingNode fwd = new ForwardingNode(nextTab); // 當advance == true時,表明該節點已經處理過了 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; // 控制 ——i ,遍歷原hash表中的節點 while (advance) { int nextIndex, nextBound; if (——i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 用CAS計算得到的transferIndex else if (U。compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 已經完成所有節點複製了 if (finishing) { nextTable = null; table = nextTab; // table 指向nextTable sizeCtl = (n << 1) - (n >>> 1); // sizeCtl閾值為原來的1。5倍 return; // 跳出死迴圈, } // CAS 更擴容閾值,在這裡面sizectl值減一,說明新加入一個執行緒參與到擴容操作 if (U。compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 遍歷的節點為null,則放入到ForwardingNode 指標節點 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // f。hash == -1 表示遍歷到了ForwardingNode節點,意味著該節點已經處理過了 // 這裡是控制併發擴容的核心 else if ((fh = f。hash) == MOVED) advance = true; // already processed else { // 節點加鎖 synchronized (f) { // 節點複製工作 if (tabAt(tab, i) == f) { Node ln, hn; // fh >= 0 ,表示為連結串列節點 if (fh >= 0) { // 構造兩個連結串列 一個是原連結串列 另一個是原連結串列的反序排列 int runBit = fh & n; Node lastRun = f; for (Node p = f。next; p != null; p = p。next) { int b = p。hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p。next) { int ph = p。hash; K pk = p。key; V pv = p。val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } // 在nextTable i 位置處插上鍊表 setTabAt(nextTab, i, ln); // 在nextTable i + n 位置處插上鍊表 setTabAt(nextTab, i + n, hn); // 在table i 位置處插上ForwardingNode 表示該節點已經處理過了 setTabAt(tab, i, fwd); // advance = true 可以執行——i動作,遍歷節點 advance = true; } // 如果是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t。first; e != null; e = e。next) { int h = e。hash; TreeNode p = new TreeNode (h, e。key, e。val, null, null); if ((h & n) == 0) { if ((p。prev = loTail) == null) lo = p; else loTail。next = p; loTail = p; ++lc; } else { if ((p。prev = hiTail) == null) hi = p; else hiTail。next = p; hiTail = p; ++hc; } } // 擴容後樹節點個數若<=6,將樹轉連結串列 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

擴容過程有點複雜,這裡主要涉及到多執行緒併發擴容,ForwardingNode的作用就是支援擴容操作,將已處理的節點和空節點置為ForwardingNode,併發處理時多個執行緒經過ForwardingNode就表示已經遍歷了,就往後遍歷,下圖是多執行緒合作擴容的過程:

介紹完擴容過程,我們再次回到put流程,在第四步中是向連結串列或者紅黑樹里加節點,到第五步,會呼叫treeifyBin()方法進行連結串列轉紅黑樹的過程

private final void treeifyBin(Node[] tab, int index) { Node b; int n, sc; if (tab != null) { //如果整個table的數量小於64,就擴容至原來的一倍,不轉紅黑樹了 //因為這個閾值擴容可以減少hash衝突,不必要去轉紅黑樹 if ((n = tab。length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b。hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e。next) { //封裝成TreeNode TreeNode p = new TreeNode(e。hash, e。key, e。val, null, null); if ((p。prev = tl) == null) hd = p; else tl。next = p; tl = p; } //透過TreeBin物件對TreeNode轉換成紅黑樹 setTabAt(tab, index, new TreeBin(hd)); } } } }}

到第六步表示已經資料加入成功了,現在put呼叫addCount()方法計算ConcurrentHashMap的size,在原來的基礎上加一,現在來看看addCount()方法

private final void addCount(long x, int check) { CounterCell[] as; long b, s; //更新baseCount,table的數量,counterCells表示元素個數的變化 if ((as = counterCells) != null || !U。compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; //如果多個執行緒都在執行,則CAS失敗,執行fullAddCount,全部加入count if (as == null || (m = as。length - 1) < 0 || (a = as[ThreadLocalRandom。getProbe() & m]) == null || !(uncontended = U。compareAndSwapLong(a, CELLVALUE, v = a。value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //check>=0表示需要進行擴容操作 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab。length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U。compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當前執行緒發起庫哦哦讓操作,nextTable=null else if (U。compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }}

put的流程現在已經分析完了,你可以從中發現,他在併發處理中使用的是樂觀鎖,當有衝突的時候才進行併發處理,而且流程步驟很清晰,但是細節設計的很複雜,畢竟多執行緒的場景也複雜

get操作

我們現在要回到開始的例子中,我們對個人資訊進行了新增之後,我們要獲取所新增的資訊,使用String name = map。get(“name”)獲取新增的name資訊,現在我們依舊用debug的方式來分析下ConcurrentHashMap的獲取方法get()

public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; int h = spread(key。hashCode()); //計算兩次hash if ((tab = table) != null && (n = tab。length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {//讀取首節點的Node元素 if ((eh = e。hash) == h) { //如果該節點就是首節點就返回 if ((ek = e。key) == key || (ek != null && key。equals(ek))) return e。val; } //hash值為負值表示正在擴容,這個時候查的是ForwardingNode的find方法來定位到nextTable來 //查詢,查詢到就返回 else if (eh < 0) return (p = e。find(h, key)) != null ? p。val : null; while ((e = e。next) != null) {//既不是首節點也不是ForwardingNode,那就往下遍歷 if (e。hash == h && ((ek = e。key) == key || (ek != null && key。equals(ek)))) return e。val; } } return null;}

ConcurrentHashMap的get操作的流程很簡單,也很清晰,可以分為三個步驟來描述

計算hash值,定位到該table索引位置,如果是首節點符合就返回

如果遇到擴容的時候,會呼叫標誌正在擴容節點ForwardingNode的find方法,查詢該節點,匹配就返回

以上都不符合的話,就往下遍歷節點,匹配就返回,否則最後就返回null

size操作

最後我們來看下例子中最後獲取size的方式int size = map。size();,現在讓我們看下size()方法

public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer。MAX_VALUE) ? Integer。MAX_VALUE : (int)n);}final long sumCount() { CounterCell[] as = counterCells; CounterCell a; //變化的數量 long sum = baseCount; if (as != null) { for (int i = 0; i < as。length; ++i) { if ((a = as[i]) != null) sum += a。value; } } return sum;}

在JDK1。8版本中,對於size的計算,在擴容和addCount()方法就已經有處理了,可以注意一下Put函式,裡面就有addCount()函式,早就計算好的,然後你size的時候直接給你。JDK1。7是在呼叫size()方法才去計算,其實在併發集合中去計算size是沒有多大的意義的,因為size是實時在變的,只能計算某一刻的大小,但是某一刻太快了,人的感知是一個時間段,所以並不是很精確