聊聊 Kafka 那點破事

Kafka作為一款開源的訊息引擎,很多人並不陌生,但深入其原始碼的同學估計不多,除非你是中介軟體團隊訊息系統維護者。但術業有專攻,市面上那麼多開源框架且每個框架又經常迭代升級,花精力深入瞭解每一個框架原始碼不太現實,本文會以業務視角羅列工作中大家需要熟知的一些知識

本篇文章的目錄:

聊聊 Kafka 那點破事

首先,為什麼使用kafka?

削峰填谷。緩衝上下游瞬時突發流量,保護“脆弱”的下游系統不被壓垮,避免引發全鏈路服務“雪崩”。

系統解耦。傳送方和接收方的松耦合,一定程度簡化了開發成本,減少了系統間不必要的直接依賴。

Kafka 名詞術語,一網打盡

聊聊 Kafka 那點破事

Broker:接收客戶端傳送過來的訊息,對訊息進行持久化

主題:Topic。主題是承載訊息的邏輯容器,在實際使用中多用來區分具體的業務。

分割槽:Partition。一個有序不變的訊息序列。每個主題下可以有多個分割槽。

訊息:這裡的訊息就是指 Kafka 處理的主要物件。

訊息位移:Offset。表示分割槽中每條訊息的位置資訊,是一個單調遞增且不變的值。

副本:Replica。Kafka 中同一條訊息能夠被複製到多個地方以提供資料冗餘,這些地方就是所謂的副本。副本還分為領導者副本和追隨者副本,各自有不同的角色劃分。每個分割槽可配置多個副本實現高可用。一個分割槽的N個副本一定在N個不同的Broker上。

生產者:Producer。向主題釋出新訊息的應用程式。

消費者:Consumer。從主題訂閱新訊息的應用程式。

消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset儲存在broker端的內部topic中,不是在clients中儲存

消費者組:Consumer Group。多個消費者例項共同組成的一個組,同時消費多個分割槽以實現高吞吐。

重平衡:Rebalance。消費者組內某個消費者例項掛掉後,其他消費者例項自動重新分配訂閱主題分割槽

ZooKeeper 在裡面的職責是什麼?

它是一個分散式協調框架,負責協調管理並儲存 Kafka 叢集的所有元資料資訊,比如叢集都有哪些 Broker 在執行、建立了哪些 Topic,每個 Topic 都有多少分割槽以及這些分割槽的 Leader 副本都在哪些機器上等資訊。

訊息傳輸的格式

純二進位制的位元組序列。當然訊息還是結構化的,只是在使用之前都要將其轉換成二進位制的位元組序列。

訊息傳輸協議

點對點模型。系統 A 傳送的訊息只能被系統 B 接收,其他任何系統都不能讀取 A 傳送的訊息

釋出/訂閱模型。該模型也有傳送方和接收方,只不過提法不同。傳送方也稱為釋出者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個釋出者向相同的主題傳送訊息,而訂閱者也可能存在多個,它們都能接收到相同主題的訊息。

訊息壓縮

生產者程式中配置

compression。type

引數即表示啟用指定型別的壓縮演算法。

props。put(“compression。type”, “gzip”)

,它表明該 Producer 的壓縮演算法使用的是

GZIP

。這樣 Producer 啟動後生產的每個訊息集合都是經 GZIP 壓縮過的,故而能很好地節省網路傳輸頻寬以及 Kafka Broker 端的磁碟佔用。

但如果Broker又指定了不同的壓縮演算法,如:

Snappy

,會將生產端的訊息解壓然後按自己的演算法重新壓縮。

各壓縮演算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。

kafka預設不指定壓縮演算法。

訊息解壓縮

當 Consumer pull訊息時,Broker 會原樣傳送出去,當訊息到達 Consumer 端後,由 Consumer 自行解壓縮還原成之前的訊息。

分割槽策略

編寫一個類實現

org。apache。kafka。clients。Partitioner

介面。實現內部兩個方法:

partition()

close()

。然後顯式地配置生產者端的引數

partitioner。class

常見的策略:

輪詢策略(預設)。保證訊息最大限度地被平均分配到所有分割槽上。

隨機策略。隨機策略是老版本生產者使用的分割槽策略,在新版本中已經改為輪詢了。

按key分割槽策略。key可能是uid或者訂單id,將同一標誌位的所有訊息都發送到同一分割槽,這樣可以保證一個分割槽內的訊息有序

其他分割槽策略。如:基於地理位置的分割槽策略

生產者管理TCP連線

在new KafkaProducer 例項時,生產者應用會在後臺建立並啟動一個名為 Sender 的執行緒,該 Sender 執行緒開始執行時首先會建立與 Broker 的連線。此時還不知道給哪個topic發訊息,所以Producer 啟動時會發起與所有的 Broker 的連線。

Producer 透過

metadata。max。age。ms

引數定期地去更新元資料資訊,預設值是 300000,即 5 分鐘,不管叢集那邊是否有變化,Producer 每 5 分鐘都會強制重新整理一次元資料以保證它是最新的資料。

Producer 傳送訊息

Producer 使用帶回調通知的傳送 API, producer。send(msg, callback)。

設定 acks = all。Producer 的一個引數,表示所有副本都成功接收到訊息,該訊息才算是“已提交”,最高等級,acks的其它值說明。min。insync。replicas > 1,表示訊息至少要被寫入到多少個副本才算是“已提交”

retries 是 Producer 的引數。當出現網路的瞬時抖動時,訊息傳送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試訊息傳送,避免訊息丟失。

冪等性 Producer

設定引數

props。put(“enable。idempotence”, ture)

,Producer 自動升級成冪等性 Producer,其他所有的程式碼邏輯都不需要改變。Kafka 自動幫你做訊息的重複去重。

原理很簡單,就是經典的空間換時間,即在 Broker 端多儲存一些欄位。當 Producer 傳送了具有相同欄位值的訊息後,Broker 能夠自動知曉這些訊息已經重複了,可以在後臺默默地把它們“丟棄”掉。

只能保證單分割槽、單會話上的訊息冪等性。一個冪等性 Producer 能夠保證某個topic的一個分割槽上不出現重複訊息,但無法實現多個分割槽的冪等性。比如採用輪詢,下一次提交換了一個分割槽就無法解決

事務型 Producer

能夠保證將訊息原子性地寫入到多個分割槽中。這批訊息要麼全部寫入成功,要麼全部失敗。能夠保證跨分割槽、跨會話間的冪等性。

producer。initTransactions();try { producer。beginTransaction(); producer。send(record1); producer。send(record2); //提交事務 producer。commitTransaction();} catch (KafkaException e) { //事務終止 producer。abortTransaction();}

實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日誌中,也就是說 Consumer 還是會看到這些訊息。要不要處理在 Consumer 端設定

isolation。level

,這個引數有兩個值:

read_uncommitted:這是預設值,表明 Consumer 能夠讀取到 Kafka 寫入的任何訊息

read_committed:表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的訊息

Kafka Broker 是如何儲存資料?

Kafka 使用訊息日誌(Log)來儲存資料,一個日誌就是磁碟上一個只能追加寫(Append-only)訊息的物理檔案。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為效能較好的順序 I/O 寫操作,這也是實現 Kafka 高吞吐量特性的一個重要手段。

不過如果你不停地向一個日誌寫入訊息,最終也會耗盡所有的磁碟空間,因此 Kafka 必然要定期地刪除訊息以回收磁碟。怎麼刪除呢?

簡單來說就是透過日誌段(Log Segment)機制。在 Kafka 底層,一個日誌又近一步細分成多個日誌段,訊息被追加寫到當前最新的日誌段中,當寫滿了一個日誌段後,Kafka 會自動切分出一個新的日誌段,並將老的日誌段封存起來。Kafka 在後臺還有定時任務會定期地檢查老的日誌段是否能夠被刪除,從而實現回收磁碟空間的目的。

Kafka 的備份機制

相同的資料複製到多臺機器上。副本的數量是可以配置的。Kafka 中

follow副本

不會對外提供服務。

副本的工作機制也很簡單:生產者總是向

leader副本

寫訊息;而消費者總是從

leader副本

讀訊息。至於follow副本,它只做一件事:向leader副本以非同步方式傳送pull請求,請求leader把最新的訊息同步給它,必然有一個時間視窗導致它和leader中的資料是不一致的,或者說它是落後於leader。

為什麼要引入消費者組?

主要是為了提升消費者端的吞吐量。多個消費者例項同時消費,加速整個消費端的吞吐量(TPS)。

在一個消費者組下,

一個分割槽只能被一個消費者消費

但一個消費者可能被分配多個分割槽

,因而在提交位移時也就能提交多個分割槽的位移。如果1個topic有2個分割槽,消費者組有3個消費者,有一個消費者將無法分配到任何分割槽,處於idle狀態。

理想情況下,Consumer 例項的數量應該等於該 Group 訂閱topic(可能多個)的分割槽總數。

消費端拉取(批次)、ACK

消費端先拉取並消費訊息,然後再ack更新offset。

1)消費者程式啟動多個執行緒,每個執行緒維護專屬的

KafkaConsumer

例項,負責完整的訊息拉取、訊息處理流程。一個

KafkaConsumer

負責一個分割槽,能保證分割槽內的訊息消費順序。

缺點:執行緒數受限於 Consumer 訂閱topic的總分割槽數。

2)任務切分成了訊息獲取和訊息處理兩個部分。消費者程式使用單或多執行緒拉取訊息,同時建立專門執行緒池執行業務邏輯。優點:可以靈活調節訊息獲取的執行緒數,以及訊息處理的執行緒數。

缺點:無法保證分割槽內的訊息消費順序。另外引入了多組執行緒,使得整個訊息消費鏈路被拉長,最終導致正確位移提交會變得異常困難,可能會出現訊息的重複消費或丟失。

消費端offset管理

1)老版本的 Consumer組把位移儲存在 ZooKeeper 中,但很快發現zk並不適合頻繁的寫更新。

2)在新版本的 Consumer Group 中,Kafka 社群重新設計了 Consumer組的位移管理方式,採用了將位移儲存在 Broker端的內部topic中,也稱為“位移主題”,由kafka自己來管理。原理很簡單, Consumer的位移資料作為一條條普通的 Kafka 訊息,提交到

__consumer_offsets

中。它的訊息格式由 Kafka 自己定義,使用者不能修改。位移主題的 Key 主要包括 3 部分內容:

Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。

Kafka 使用Compact策略來刪除位移主題中的過期訊息,避免該topic無限期膨脹。提供了專門的後臺執行緒定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除資料。

Rebalance 觸發條件

1)組成員數發生變更。比如有新的 Consumer 例項加入組或者離開組,又或是有 Consumer 例項崩潰被“踢出”組。(99%原因是由它導致)

2) 訂閱topic數發生變更。Consumer Group 可以使用正則表示式的方式訂閱topic,比如 consumer。subscribe(Pattern。compile(“t。*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的topic。在 Consumer Group 的執行過程中,你新建立了一個滿足這樣條件的topic,那麼該 Group 就會發生 Rebalance。

3) 訂閱topic的分割槽數發生變化。Kafka 目前只允許增加topic的分割槽數。當分割槽數增加時,也會觸發訂閱該topic的所有 Group 開啟 Rebalance。

訊息的順序性

Kafka的設計中多個分割槽的話無法保證全域性的訊息順序。如果一定要實現全域性的訊息順序,只能單分割槽。

方法二:透過有key分組,同一個key的訊息放入同一個分割槽,保證區域性有序

歷史資料清理策略

基於儲存時間,

log。retention。hours

基於日誌大小的清理策略。透過

log。retention。bytes

控制

組合方式

總結了很多有關於java面試的資料,希望能夠幫助正在學習java的小夥伴。由於資料過多不便發表文章,創作不易,望小夥伴們能夠給我一些動力繼續建立更好的java類學習資料文章,

請多多支援和關注小作,別忘了點贊+評論+轉發。右上角私信我回復【03】即可領取免費學習資料謝謝啦!

聊聊 Kafka 那點破事

原文出處:https://mp。weixin。qq。com/s/c-SkcgDTVctH4-1dvDaf-A