Kafka作為一款開源的訊息引擎,很多人並不陌生,但深入其原始碼的同學估計不多,除非你是中介軟體團隊訊息系統維護者。但術業有專攻,市面上那麼多開源框架且每個框架又經常迭代升級,花精力深入瞭解每一個框架原始碼不太現實,本文會以業務視角羅列工作中大家需要熟知的一些知識
本篇文章的目錄:
首先,為什麼使用kafka?
削峰填谷。緩衝上下游瞬時突發流量,保護“脆弱”的下游系統不被壓垮,避免引發全鏈路服務“雪崩”。
系統解耦。傳送方和接收方的松耦合,一定程度簡化了開發成本,減少了系統間不必要的直接依賴。
Kafka 名詞術語,一網打盡
Broker:接收客戶端傳送過來的訊息,對訊息進行持久化
主題:Topic。主題是承載訊息的邏輯容器,在實際使用中多用來區分具體的業務。
分割槽:Partition。一個有序不變的訊息序列。每個主題下可以有多個分割槽。
訊息:這裡的訊息就是指 Kafka 處理的主要物件。
訊息位移:Offset。表示分割槽中每條訊息的位置資訊,是一個單調遞增且不變的值。
副本:Replica。Kafka 中同一條訊息能夠被複製到多個地方以提供資料冗餘,這些地方就是所謂的副本。副本還分為領導者副本和追隨者副本,各自有不同的角色劃分。每個分割槽可配置多個副本實現高可用。一個分割槽的N個副本一定在N個不同的Broker上。
生產者:Producer。向主題釋出新訊息的應用程式。
消費者:Consumer。從主題訂閱新訊息的應用程式。
消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset儲存在broker端的內部topic中,不是在clients中儲存
消費者組:Consumer Group。多個消費者例項共同組成的一個組,同時消費多個分割槽以實現高吞吐。
重平衡:Rebalance。消費者組內某個消費者例項掛掉後,其他消費者例項自動重新分配訂閱主題分割槽
ZooKeeper 在裡面的職責是什麼?
它是一個分散式協調框架,負責協調管理並儲存 Kafka 叢集的所有元資料資訊,比如叢集都有哪些 Broker 在執行、建立了哪些 Topic,每個 Topic 都有多少分割槽以及這些分割槽的 Leader 副本都在哪些機器上等資訊。
訊息傳輸的格式
純二進位制的位元組序列。當然訊息還是結構化的,只是在使用之前都要將其轉換成二進位制的位元組序列。
訊息傳輸協議
點對點模型。系統 A 傳送的訊息只能被系統 B 接收,其他任何系統都不能讀取 A 傳送的訊息
釋出/訂閱模型。該模型也有傳送方和接收方,只不過提法不同。傳送方也稱為釋出者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個釋出者向相同的主題傳送訊息,而訂閱者也可能存在多個,它們都能接收到相同主題的訊息。
訊息壓縮
生產者程式中配置
compression。type
引數即表示啟用指定型別的壓縮演算法。
props。put(“compression。type”, “gzip”)
,它表明該 Producer 的壓縮演算法使用的是
GZIP
。這樣 Producer 啟動後生產的每個訊息集合都是經 GZIP 壓縮過的,故而能很好地節省網路傳輸頻寬以及 Kafka Broker 端的磁碟佔用。
但如果Broker又指定了不同的壓縮演算法,如:
Snappy
,會將生產端的訊息解壓然後按自己的演算法重新壓縮。
各壓縮演算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
kafka預設不指定壓縮演算法。
訊息解壓縮
當 Consumer pull訊息時,Broker 會原樣傳送出去,當訊息到達 Consumer 端後,由 Consumer 自行解壓縮還原成之前的訊息。
分割槽策略
編寫一個類實現
org。apache。kafka。clients。Partitioner
介面。實現內部兩個方法:
partition()
和
close()
。然後顯式地配置生產者端的引數
partitioner。class
常見的策略:
輪詢策略(預設)。保證訊息最大限度地被平均分配到所有分割槽上。
隨機策略。隨機策略是老版本生產者使用的分割槽策略,在新版本中已經改為輪詢了。
按key分割槽策略。key可能是uid或者訂單id,將同一標誌位的所有訊息都發送到同一分割槽,這樣可以保證一個分割槽內的訊息有序
其他分割槽策略。如:基於地理位置的分割槽策略
生產者管理TCP連線
在new KafkaProducer 例項時,生產者應用會在後臺建立並啟動一個名為 Sender 的執行緒,該 Sender 執行緒開始執行時首先會建立與 Broker 的連線。此時還不知道給哪個topic發訊息,所以Producer 啟動時會發起與所有的 Broker 的連線。
Producer 透過
metadata。max。age。ms
引數定期地去更新元資料資訊,預設值是 300000,即 5 分鐘,不管叢集那邊是否有變化,Producer 每 5 分鐘都會強制重新整理一次元資料以保證它是最新的資料。
Producer 傳送訊息
:
Producer 使用帶回調通知的傳送 API, producer。send(msg, callback)。
設定 acks = all。Producer 的一個引數,表示所有副本都成功接收到訊息,該訊息才算是“已提交”,最高等級,acks的其它值說明。min。insync。replicas > 1,表示訊息至少要被寫入到多少個副本才算是“已提交”
retries 是 Producer 的引數。當出現網路的瞬時抖動時,訊息傳送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試訊息傳送,避免訊息丟失。
冪等性 Producer
設定引數
props。put(“enable。idempotence”, ture)
,Producer 自動升級成冪等性 Producer,其他所有的程式碼邏輯都不需要改變。Kafka 自動幫你做訊息的重複去重。
原理很簡單,就是經典的空間換時間,即在 Broker 端多儲存一些欄位。當 Producer 傳送了具有相同欄位值的訊息後,Broker 能夠自動知曉這些訊息已經重複了,可以在後臺默默地把它們“丟棄”掉。
只能保證單分割槽、單會話上的訊息冪等性。一個冪等性 Producer 能夠保證某個topic的一個分割槽上不出現重複訊息,但無法實現多個分割槽的冪等性。比如採用輪詢,下一次提交換了一個分割槽就無法解決
事務型 Producer
能夠保證將訊息原子性地寫入到多個分割槽中。這批訊息要麼全部寫入成功,要麼全部失敗。能夠保證跨分割槽、跨會話間的冪等性。
producer。initTransactions();try { producer。beginTransaction(); producer。send(record1); producer。send(record2); //提交事務 producer。commitTransaction();} catch (KafkaException e) { //事務終止 producer。abortTransaction();}
實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日誌中,也就是說 Consumer 還是會看到這些訊息。要不要處理在 Consumer 端設定
isolation。level
,這個引數有兩個值:
read_uncommitted:這是預設值,表明 Consumer 能夠讀取到 Kafka 寫入的任何訊息
read_committed:表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的訊息
Kafka Broker 是如何儲存資料?
Kafka 使用訊息日誌(Log)來儲存資料,一個日誌就是磁碟上一個只能追加寫(Append-only)訊息的物理檔案。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為效能較好的順序 I/O 寫操作,這也是實現 Kafka 高吞吐量特性的一個重要手段。
不過如果你不停地向一個日誌寫入訊息,最終也會耗盡所有的磁碟空間,因此 Kafka 必然要定期地刪除訊息以回收磁碟。怎麼刪除呢?
簡單來說就是透過日誌段(Log Segment)機制。在 Kafka 底層,一個日誌又近一步細分成多個日誌段,訊息被追加寫到當前最新的日誌段中,當寫滿了一個日誌段後,Kafka 會自動切分出一個新的日誌段,並將老的日誌段封存起來。Kafka 在後臺還有定時任務會定期地檢查老的日誌段是否能夠被刪除,從而實現回收磁碟空間的目的。
Kafka 的備份機制
相同的資料複製到多臺機器上。副本的數量是可以配置的。Kafka 中
follow副本
不會對外提供服務。
副本的工作機制也很簡單:生產者總是向
leader副本
寫訊息;而消費者總是從
leader副本
讀訊息。至於follow副本,它只做一件事:向leader副本以非同步方式傳送pull請求,請求leader把最新的訊息同步給它,必然有一個時間視窗導致它和leader中的資料是不一致的,或者說它是落後於leader。
為什麼要引入消費者組?
主要是為了提升消費者端的吞吐量。多個消費者例項同時消費,加速整個消費端的吞吐量(TPS)。
在一個消費者組下,
一個分割槽只能被一個消費者消費
,
但一個消費者可能被分配多個分割槽
,因而在提交位移時也就能提交多個分割槽的位移。如果1個topic有2個分割槽,消費者組有3個消費者,有一個消費者將無法分配到任何分割槽,處於idle狀態。
理想情況下,Consumer 例項的數量應該等於該 Group 訂閱topic(可能多個)的分割槽總數。
消費端拉取(批次)、ACK
消費端先拉取並消費訊息,然後再ack更新offset。
1)消費者程式啟動多個執行緒,每個執行緒維護專屬的
KafkaConsumer
例項,負責完整的訊息拉取、訊息處理流程。一個
KafkaConsumer
負責一個分割槽,能保證分割槽內的訊息消費順序。
缺點:執行緒數受限於 Consumer 訂閱topic的總分割槽數。
2)任務切分成了訊息獲取和訊息處理兩個部分。消費者程式使用單或多執行緒拉取訊息,同時建立專門執行緒池執行業務邏輯。優點:可以靈活調節訊息獲取的執行緒數,以及訊息處理的執行緒數。
缺點:無法保證分割槽內的訊息消費順序。另外引入了多組執行緒,使得整個訊息消費鏈路被拉長,最終導致正確位移提交會變得異常困難,可能會出現訊息的重複消費或丟失。
消費端offset管理
1)老版本的 Consumer組把位移儲存在 ZooKeeper 中,但很快發現zk並不適合頻繁的寫更新。
2)在新版本的 Consumer Group 中,Kafka 社群重新設計了 Consumer組的位移管理方式,採用了將位移儲存在 Broker端的內部topic中,也稱為“位移主題”,由kafka自己來管理。原理很簡單, Consumer的位移資料作為一條條普通的 Kafka 訊息,提交到
__consumer_offsets
中。它的訊息格式由 Kafka 自己定義,使用者不能修改。位移主題的 Key 主要包括 3 部分內容:
Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。
Kafka 使用Compact策略來刪除位移主題中的過期訊息,避免該topic無限期膨脹。提供了專門的後臺執行緒定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除資料。
Rebalance 觸發條件
1)組成員數發生變更。比如有新的 Consumer 例項加入組或者離開組,又或是有 Consumer 例項崩潰被“踢出”組。(99%原因是由它導致)
2) 訂閱topic數發生變更。Consumer Group 可以使用正則表示式的方式訂閱topic,比如 consumer。subscribe(Pattern。compile(“t。*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的topic。在 Consumer Group 的執行過程中,你新建立了一個滿足這樣條件的topic,那麼該 Group 就會發生 Rebalance。
3) 訂閱topic的分割槽數發生變化。Kafka 目前只允許增加topic的分割槽數。當分割槽數增加時,也會觸發訂閱該topic的所有 Group 開啟 Rebalance。
訊息的順序性
Kafka的設計中多個分割槽的話無法保證全域性的訊息順序。如果一定要實現全域性的訊息順序,只能單分割槽。
方法二:透過有key分組,同一個key的訊息放入同一個分割槽,保證區域性有序
歷史資料清理策略
基於儲存時間,
log。retention。hours
基於日誌大小的清理策略。透過
log。retention。bytes
控制
組合方式
總結了很多有關於java面試的資料,希望能夠幫助正在學習java的小夥伴。由於資料過多不便發表文章,創作不易,望小夥伴們能夠給我一些動力繼續建立更好的java類學習資料文章,
請多多支援和關注小作,別忘了點贊+評論+轉發。右上角私信我回復【03】即可領取免費學習資料謝謝啦!
原文出處:https://mp。weixin。qq。com/s/c-SkcgDTVctH4-1dvDaf-A