訊息佇列kafka - 如何儲存消費端的消費位置

上一篇文章我們介紹了Kafka的基本原理,這一篇文章我們繼續介紹kafka中如何儲存消費端的消費位置。

如何儲存消費端的消費位置

什麼是offset

前面在講解partition的時候,提到過offset, 每個topic可以劃分多個分割槽(每個Topic至少有一個分 區),同一topic下的不同分割槽包含的訊息是不同的。每個訊息在被新增到分割槽時,都會被分配一個 offset(稱之為偏移量),它是訊息在此分割槽中的唯一編號,kafka透過offset保證訊息在分割槽內的順 序,offset的順序不跨分割槽,即kafka只保證在同一個分割槽內的訊息是有序的;對於應用層的消費來說,每次消費一個訊息並且提交以後,會儲存當前消費到的最近的一個offset。那麼offset儲存在哪裡?

訊息佇列kafka - 如何儲存消費端的消費位置

offset在哪裡維護?

在kafka中,提供了一個consumer_offsets_* 的一個topic,把offset資訊寫入到這個topic中。

consumer_offsets——儲存了每個consumer group某一時刻提交的offset資訊。

__consumer_offsets 預設有50個分割槽。

根據前面我們演示的案例,我們設定了一個KafkaConsumerDemo的groupid。首先我們需要找到這個 consumer_group儲存在哪個分割槽中:

properties。put(ConsumerConfig。GROUP_ID_CONFIG,“KafkaConsumerDemo”);

計算公式

Math。abs(“groupid”。hashCode())%groupMetadataTopicPartitionCount ; 由於預設情況下 groupMetadataTopicPartitionCount有50個分割槽,計算得到的結果為:35, 意味著當前的 consumer_group的位移資訊儲存在__consumer_offsets的第35個分割槽

執行如下命令,可以檢視當前consumer_goup中的offset位移提交的資訊

kafka-console-consumer。sh ——topic __consumer_offsets ——partition 15 -bootstrap-server 192。168。1。12:909 ——formatter ‘kafka。coordinator。group。GroupMetadataManager$OffsetsMessageFormatter’

從輸出結果中,我們就可以看到test這個topic的offset的位移日誌

副本的分割槽機制

我們已經知道Kafka的每個topic都可以分為多個Partition,並且多個partition會均勻分佈在叢集的各個節點下。雖然這種方式能夠有效地對資料進行分片,但是對於每個partition來說,都是單點的,當其中一個partition不可用的時候,那麼這部分訊息就沒辦法消費。所以kafka為了提高partition的可靠性而提供了副本的概念(Replica),透過副本機制來實現冗餘備份。

每個分割槽可以有多個副本,並且在副本集合中會存在一個leader的副本,所有的讀寫請求都是由leader 副本來進行處理。剩餘的其他副本都作為follower副本,follower副本會從leader副本同步訊息日誌。這個有點類似zookeeper中leader和follower的概念,但是具體的實現方式還是有比較大的差異。所以我們可以認為,副本集會存在一主多從的關係。

一般情況下,同一個分割槽的多個副本會被均勻分配到叢集中的不同broker上,當leader副本所在的broker出現故障後,可以重新選舉新的leader副本繼續對外提供服務。透過這樣的副本機制來提高kafka叢集的可用性。

建立一個帶副本機制的訊息

透過下面的命令去建立帶2個副本的topic:

sh kafka-topics。sh ——create ——zookeeper 192。168。1。1:2181 ——replication-factor 3 ——partitions 3 ——topic secondTopic

然後我們可以在/tmp/kafka-log路徑下看到對應topic的副本資訊了。我們透過一個圖形的方式來表達。

針對secondTopic這個topic的3個分割槽對應的3個副本

訊息佇列kafka - 如何儲存消費端的消費位置

如何知道哪個分割槽中對應的leader是誰呢?

在zookeeper伺服器上,透過如下命令去獲取對應分割槽的資訊, 比如下面這個是獲取secondTopic第1個 分割槽的狀態資訊。

get /brokers/topics/secondTopic/partitions/1/state

{“controller_epoch”:12,“leader”:0,“version”:1,“leader_epoch”:0,“isr”:[0,1]}

透過這個命令 sh kafka-topics。sh ——zookeeper 192。168。13。106:2181 ——describe ——topic test_partition

leader表示當前分割槽的leader是那個broker-id。下圖中。綠色線條的表示該分割槽中的leader節點。其他 節點就為follower

訊息佇列kafka - 如何儲存消費端的消費位置

需要注意的是,kafka叢集中的一個broker中最多隻能有一個副本,leader副本所在的broker節點的 分割槽叫leader節點,follower副本所在的broker節點的分割槽叫follower節點。

副本的Leader選舉

Kafka提供了資料複製演算法保證,如果leader副本所在的broker節點宕機或者出現故障,或者分割槽的 leader節點發生故障,這個時候怎麼處理呢?

那麼,kafka必須要保證從follower副本中選擇一個新的leader副本。那麼kafka是如何實現選舉的呢?要了解leader選舉,我們需要了解幾個概念

Kafka分割槽下有可能有很多個副本(replica)用於實現冗餘,從而進一步實現高可用。副本根據角色的不同 可分為3類:

leader副本

:響應clients端讀寫請求的副本

follower副本

:被動地備份leader副本中的資料,不能響應clients端讀寫請求。

ISR副本

:包含了leader副本和所有與leader副本保持同步的follower副本——如何判定是否與leader同 步後面會提到每個Kafka副本物件都有兩個重要的屬性:LEO和HW。注意是所有的副本,而不只是 leader副本。

LEO

:即日誌末端位移(log end offset),記錄了該副本底層日誌(log)中下一條訊息的位移值。注意是下 一條訊息!也就是說,如果LEO=10,那麼表示該副本儲存了10條訊息,位移值範圍是[0, 9]。另外, leader LEO和follower LEO的更新是有區別的。我們後面會詳細說

HW

:即上面提到的水位值。對於同一個副本物件而言,其HW值不會大於LEO值。小於等於HW值的所有訊息都被認為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區別的。

從生產者發出的一條訊息首先會被寫入分割槽的leader 副本,不過還需要等待ISR集合中的所有follower副本都同步完之後才能被認為已經提交,之後才會更新分割槽的HW, 進而消費者可以消費到這條訊息。

副本協同機制

剛剛提到了,訊息的讀寫操作都只會由leader節點來接收和處理。follower副本只負責同步資料以及當leader副本所在的broker掛了以後,會從follower副本中選取新的leader。

寫請求首先由Leader副本處理,之後follower副本會從leader上拉取寫入的訊息,這個過程會有一定的延遲,導致follower副本中儲存的訊息略少於leader副本,但是隻要沒有超出閾值都可以容忍。但是如果一個follower副本出現異常,比如宕機、網路斷開等原因長時間沒有同步到訊息,那這個時候, leader就會把它踢出去。kafka透過ISR集合來維護一個分割槽副本資訊

訊息佇列kafka - 如何儲存消費端的消費位置

一個新leader被選舉並被接受客戶端的訊息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader;leader負責維護和跟蹤ISR(in-Sync replicas , 副本同步佇列)中所有follower滯後的狀態。當producer傳送一條訊息到broker後,leader寫入訊息並複製到所有follower。訊息提交之後才被成功複製到所有的同步副本。

ISR

ISR表示目前“可用且訊息量與leader相差不多的副本集合,這是整個副本集合的一個子集”。怎麼去理解 可用和相差不多這兩個詞呢?具體來說,ISR集合中的副本必須滿足兩個條件:

副本所在節點必須維持著與zookeeper的連線

副本最後一條訊息的offset與leader副本的最後一條訊息的offset之間的差值不能超過指定的閾值 (replica。lag。time。max。ms) replica。lag。time。max。ms:如果該follower在此時間間隔內一直沒有追 上過leader的所有訊息,則該follower就會被剔除isr列表

ISR資料儲存在Zookeeper的 /brokers/topics//partitions//state 節點中

follower副本把leader副本LEO之前的日誌全部同步完成時,則認為follower副本已經追趕上了leader副本,這個時候會更新這個副本的lastCaughtUpTimeMs標識,kafk副本管理器會啟動一個副本過期檢 查的定時任務,這個任務會定期檢查當前時間與副本的lastCaughtUpTimeMs的差值是否大於引數 replica。lag。time。max。ms 的值,如果大於,則會把這個副本踢出ISR集合

訊息佇列kafka - 如何儲存消費端的消費位置