一篇圖文解讀如何保證Kafka訊息不丟失

kafka到底會不會丟失資料呢?通常不會丟失,但是在有些情況下是會丟失資料的。我們需要在Producer端、Broker端和Consumer端配置好引數,以保障在Kafka叢集出現故障時也不會丟失資料。

一 合理設定Producer端引數,以保障訊息不丟失

1。1 採用producer。send(msg, callback)

如果使用KafkaProducer。send(ProducerRecord)方法,這是一個非同步傳送訊息的機制,只是把訊息放到一個緩衝區中,後臺的IO執行緒會不斷地掃描該緩衝區,將訊息封裝到一個批次中傳送出去。顯然這種方式會有丟失資料的風險,如果IO執行緒在傳送訊息之前,突然掛掉了,累計在Accumulator中的訊息可能會丟失。因此,我們建議使用KafkaProducer。send(ProducerRecord, callback)帶回調通知的API。

1。2 設定acks=all

這個比較好理解,當ISR中所有follower副本都響應了才認為訊息提交成功。

Producer等待Broker的ack, Partition的Leader和ISR裡面的所有Follower全部落盤成功後才返回 ack。但是如果在Follower同步完成後, Broker傳送ack之前,Leader發生故障,那麼會造成資料重複。 這時候萬一Leader宕機了,就可以自動切換到那個Follower,此時資料就不會丟失。

一篇圖文解讀如何保證Kafka訊息不丟失

1。3 設定retries=Long。MAX_VALUE

將retries引數設定成一個較大的引數值,當網路出現抖動或其他問題時,可能會導致訊息傳送失敗;此時如果設定了retries=Long。MAX_VALUE就能保證自動重新發送訊息,以保證訊息不丟失。

1。4max。in。flight。requests。per。connection=1

這個引數指定了生產者在收到Kafka伺服器響應之前可以傳送多少訊息,它的值設定的越大,吞吐量就越高。但是,把它設定為1可以保證訊息是能夠按順序寫入到kafka伺服器的。設定為1就意味著,Kafka的Broker在響應請求之前,Client端是不能夠傳送訊息的。

注意:

這個引數主要是為了保證訊息順序性,以避免亂序的

1。5 顯示關閉Producer

在使用

KafkaProducer。send(ProducerRecord, callback)時,在callback的邏輯中最好是顯示關閉Producer。

注意:

這樣做主要是為了保證訊息順序性,以避免亂序的

二 合理設定Broker端引數,保障訊息不丟失

對於Kafka叢集而言,Kafka的

訊息複製

機制和

分割槽多副本

機制是保證Kafka訊息可靠性的基石,在把訊息寫入多個副本時,即使Kafka的Broker幾點傳送崩潰也不會導致訊息丟失。

2。1 replication。factor >= 3

Kafka中的分割槽因子包括Leader副本和Follower副本因子,每個分割槽在建立的時候就會選舉出一個副本作為Leader副本,其他的副本作為Follower副本。而在Kafka叢集中,Follower副本是不對外提供服務的,所有的請求都是有Leader副本完成。Follower副本的唯一任務就是從Leader副本中同步訊息,從而實現自己和Leader副本的訊息保持同步。這一條是必須要保證的,如果Leader沒有Follower,或者Follower都沒有跟上Leader資料的同步,那是無論如何都不能做到高可用的。

提醒一下:Kafka在預設的情況下會把分割槽的副本分佈在不同的Broker上,但是如果這些Broker都在同一個機架上,一旦機架上的交換機發生故障,分割槽就不可用了。建議把Broker分佈在不同的機架上,可以使用broker。rack引數配置機架的名稱。

我們的Kafka叢集架構必須要做到,每次寫資料,必須是Leader和Follower都寫成功才算成功。也就是說,每寫一條資料必須要有兩個副本寫入成功。這時候萬一Leader宕機了,就可以自動切換到那個Follower,此時資料就不會丟失。

2。2 min。insync。replica>1

訊息寫入多少個副本才算已提交。根據Kafka對訊息可靠性的定義,訊息只有在寫入Follower副本之後才算已提交,因此必須要確保被寫入的訊息不止寫入一個副本中,就需要把最小同步副本數量設定成一個比較大的值。

注意,我們通常設定replication。factor>min。insync。replica,以保證可用性,如果相等,則任何一個副本掛了,則整個分割槽都不能用了。

2。3 rerplica。lag。time。max。ms=10000

這個引數設定的是Follower副本與Leader副本訊息最大的延遲容忍時間,預設是10秒。如果Leader發現Follower超過10秒沒有向它發起fech請求,那麼Leader考慮這個Follower是不是程式出了點問題,或者資源緊張排程不過來,它太慢了,不希望它拖慢後面的進度,就把它從ISR中移除。

2。4unclean。leader。election。ename=false

資料缺失太多的Broker不能作為Leader,防止資料丟失。換言之,如果我們允許不同步的副本成為Leader副本,那麼就會承擔資料不一致的風險。因此,強烈建議將這個引數設定為false。

三 合理設定Consumer端引數,以保證訊息不丟失

消費者透過pull模式主動地去Kafka叢集拉取訊息,與Producer相同的是,消費者在拉取訊息的時候也是找Leader副本去拉取。

一篇圖文解讀如何保證Kafka訊息不丟失

3。1 enable。auto。commit=false

禁用自動提交機制,

禁止自動提交offset,避免多執行緒消費時出現訊息丟失。

先處理訊息再commit。如果在commit之前發生異常,下次還會消費到該訊息,重複資料問題可以透過冪等性和事務性來解決,以保證Kafka端到端的資料一致性。

3。2 配置auto。offset。reset

這個引數有兩種配置方法,一種是earlest:從最早的訊息開始消費,這種做法會避免訊息丟失,但是會導致重複消費;另外一種是latest(預設)從訊息的尾部開始消費,這樣做可能會丟失一部分資料。

四 總結

這篇文章主要是介紹了,Kafka是完全可以保證訊息不丟失的,必須要保證訊息已經提交到Kafka叢集,同時還要滿足上述配置要求。歡迎大家評論、補充。