「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

目錄

3。1 消費者與消費組

3。2 客戶端開發

3。2。1 必要的引數配置

3。2。2 訂閱主題與分割槽

3。2。3 反序列化

3。2。4 訊息消費

3。2。5 位移提交

3。2。6 控制或關閉消費

3。2。7 指定位移消費

3。2。8 再均衡

3。2。9 消費者攔截器

3。2。10 多執行緒實現

3。2。11 重要的消費者引數

3。3 總結

第3章 Kafka消費者

與生產者對應的是消費者,應用程式可以透過KafkaConsumer來訂閱主題,並從訂閱的主題中拉取訊息。不過在使用KafkaConsumer消費訊息之前需要先了解消費者和消費組的概念,否則無法理解如何使用 KafkaConsumer。本章首先講解消費者與消費組之間的關係,進而再細緻地講解如何使用KafkaConsumer。

3。1 消費者與消費組

在閱讀文章之前,我們先來思考幾個問題,加深我們對知識應用的理解

預設的分割槽訊息分配策略是什麼?

為什麼Kafka要引入訊息組概念?

消費者個數大於分割槽個數後如何分配訊息?

動態增加消費組,分割槽的訊息如何分配?

消費者(Consumer)負責訂閱Kafka中的主題(Topic),並且從訂閱的主題上拉取訊息

。與其他一些訊息中介軟體不同的是:在Kafka的消費理念中還有一層

消費組

(Consumer Group)的概念,

每個消費者都有一個對應的消費組

。當訊息釋出到主題後,只會被投遞給訂閱它的

每個消費組中的一個消費者

如圖3-1所示,某個主題中共有4個分割槽(Partition):P0、P1、P2、P3。有兩個消費組A和B都訂閱了這個主題,消費組A中有4個消費者(C0、C1、C2和C3),消費組B中有2個消費者(C4和C5)。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-1 消費者與消費組

按照Kafka預設的規則,最後的分配結果是消費組A中的每一個消費者分配到1個分割槽,消費組B中的每一個消費者分配到2個分割槽,

兩個消費組之間互不影響

。每個消費者只能消費所分配到的分割槽中的訊息。換言之,

每一個分割槽只能被一個消費組中的一個消費者所消費

我們再來看一下消費組內的消費者個數變化時所對應的分割槽分配的演變。假設目前某消費組內只有一個消費者 C0,訂閱了一個主題,這個主題包含 7 個分割槽:P0、P1、P2、P3、P4、P5、P6。也就是說,這個消費者C0訂閱了7個分割槽,具體分配情形參考圖3-2

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-2 消費組內只有1個消費者

此時消費組內又加入了一個新的消費者C1,按照既定的邏輯,需要將原來消費者C0的部分分割槽分配給消費者C1消費,如圖3-3所示。消費者C0和C1各自負責消費所分配到的分割槽,彼此之間並無邏輯上的干擾。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-3 消費組內有2個消費者

緊接著消費組內又加入了一個新的消費者C2,消費者C0、C1和C2按照圖3-4中的方式各自負責消費所分配到的分割槽。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-4 消費組內有3個消費者

消費者與消費組這種模型可以讓整體的消費能力具備橫向伸縮性,我們可以增加(或減少)消費者的個數來提高(或降低)整體的消費能力。對於分割槽數固定的情況,一味地增加消費者並不會讓消費能力一直得到提升,

如果消費者過多,出現了消費者的個數大於分割槽個數的情況,就會有消費者分配不到任何分割槽

。參考圖3-5,一共有8個消費者,7個分割槽,那麼最後的消費者C7由於分配不到任何分割槽而無法消費任何訊息。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-5 消費組內有過多的消費者

以上分配邏輯都是基於

預設的分割槽分配策略

進行分析的,可以透過消費者客戶端引數partition。assignment。strategy 來設定消費者與訂閱主題之間的分割槽分配策略,有關分割槽分配的更多細節可以參考7。1節。

對於訊息中介軟體而言,一般有兩種訊息投遞模式:

點對點

(P2P,Point-to-Point)模式和

釋出/訂閱

(Pub/Sub)模式。

點對點模式是基於佇列的,訊息生產者傳送訊息到佇列,訊息消費者從佇列中接收訊息。

釋出訂閱模式定義瞭如何向一個內容節點發布和訂閱訊息,這個內容節點稱為主題(Topic),主題可以認為是訊息傳遞的中介,訊息釋出者將訊息釋出到某個主題,而訊息訂閱者從主題中訂閱訊息。主題使得訊息的訂閱者和釋出者互相保持獨立,不需要進行接觸即可保證訊息的傳遞,釋出/訂閱模式在訊息的一對多廣播時採用。

Kafka 同時支援兩種訊息投遞模式

,而這正是得益於消費者與消費組模型的契合:

如果所有的消費者都隸

屬於同一個消費組

,那麼所有的訊息都會被均衡地投遞給每一個消費者,即每條訊息只會被一個消費者處理,這就相當於

點對點模式

的應用。

如果所有的消費者都隸

屬於不同的消費組

,那麼所有的訊息都會被廣播給所有的消費者,即每條訊息會被所有的消費者處理,這就相當於

釋出/訂閱模式

的應用。

消費組是一個邏輯上的概念,它將旗下的消費者歸為一類,每一個消費者只隸屬於一個消費組。每一個消費組都會有一個固定的名稱,消費者在進行消費前需要指定其所屬消費組的名稱,這個可以透過消費者客戶端引數group。id來配置,預設值為空字串。

消費者並非邏輯上的概念,它是實際的應用例項,它可以是一個執行緒,也可以是一個程序。同一個消費組內的消費者既可以部署在同一臺機器上,也可以部署在不同的機器上。

3。2 客戶端開發

在瞭解了消費者與消費組之間的概念之後,我們就可以著手進行消費者客戶端的開發了。在Kafka的歷史中,消費者客戶端同生產者客戶端一樣也經歷了兩個大版本:第一個是於Kafka開源之初使用Scala 語言編寫的客戶端,我們可以稱之為舊消費者客戶端(Old Consumer)或Scala消費者客戶端;第二個是從Kafka 0。9。x版本開始推出的使用Java編寫的客戶端,我們可以稱之為新消費者客戶端(New Consumer)或Java消費者客戶端,它彌補了舊客戶端中存在的諸多設計缺陷。

一個正常的消費邏輯需要具備以下幾個步驟:

(1)配置消費者客戶端引數及建立相應的消費者例項。

(2)訂閱主題。

(3)拉取訊息並消費。

(4)提交消費位移。

(5)關閉消費者例項。

3。2。1 必要的引數配置

· bootstrap。servers:該引數的釋義和生產者客戶端 KafkaProducer 中的相同,用來 指 定 連 接Kafka 集 群 所 需 的 broker 地 址 清 單,具 體 內 容 形 式 為host1:port1,host2:post,可以設定一個或多個地址,中間用逗號隔開,此引數的預設值為“”。注意這裡並非需要設定叢集中全部的broker地址,消費者會從現有的配置中查詢到全部的Kafka叢集成員。這裡設定兩個以上的broker地址資訊,當其中任意一個宕機時,消費者仍然可以連線到Kafka叢集上。有關此引數的更多釋義可以參考6。5。2節。

· group。id:消費者隸屬的消費組的名稱,預設值為“”。如果設定為空,則會報出異常:Exceptionin thread “main” org。apache。kafka。common。errors。InvalidGroupIdException:The configuredgroupId is invalid。一般而言,這個引數需要設定成具有一定的業務意義的名稱。

· key。deserializer 和 value。deserializer:與生產者客戶端 KafkaProducer中的key。serializer和value。serializer引數對應。消費者從broker端獲取的訊息格式都是位元組陣列(byte[])型別,所以需要執行相應的反序列化操作才能還原成原有的物件格式。這兩個引數分別用來指定訊息中key和value所需反序列化操作的反序列化器,這兩個引數無預設值。注意這裡必須填寫反序列化器類的全限定名,比如示例中的org。apache。kafka。common。serialization。StringDeserializer,單單指定StringDeserializer是錯誤的。有關更多的反序列化內容可以參考3。2。3節。

spring: kafka: bootstrap-servers: 172。101。203。33:9092 producer: # 發生錯誤後,訊息重發的次數。 retries: 0 #當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。 batch-size: 16384 # 設定生產者記憶體緩衝區的大小。 buffer-memory: 33554432 # 鍵的序列化方式 key-serializer: org。apache。kafka。common。serialization。StringSerializer # 值的序列化方式 value-serializer: org。apache。kafka。common。serialization。StringSerializer # acks=0 : 生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。 # acks=1 : 只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器成功響應。 # acks=all :只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。 acks: 1 consumer: # 自動提交的時間間隔 在spring boot 2。X 版本中這裡採用的是值的型別為Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 該屬性指定了消費者在讀取一個沒有偏移量的分割槽或者偏移量無效的情況下該作何處理: # latest(預設值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取資料(在消費者啟動之後生成的記錄) # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分割槽的記錄 auto-offset-reset: earliest # 是否自動提交偏移量,預設值是true,為了避免出現重複資料和資料丟失,可以把它設定為false,然後手動提交偏移量 enable-auto-commit: false # 鍵的反序列化方式 key-deserializer: org。apache。kafka。common。serialization。StringDeserializer # 值的反序列化方式 value-deserializer: org。apache。kafka。common。serialization。StringDeserializer listener: # 在偵聽器容器中執行的執行緒數。 concurrency: 5 #listner負責ack,每呼叫一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false

3。2。2 訂閱主題與分割槽

在建立好消費者之後,我們就需要為該消費者訂閱相關的主題了。一個消費者可以訂閱一個或多個主題

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

消費者不僅可以透過KafkaConsumer。subscribe()方法訂閱主題,還可以直接訂閱某些主題的特定分割槽,在KafkaConsumer中還提供了一個assign()方法來實現這些功能,此方法的具體定義如下:

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

這個方法只接受一個引數partitions,用來指定需要訂閱的分割槽集合。這裡補充說明一下TopicPartition類,在Kafka的客戶端中,它用來表示分割槽,這個類的部分內容如下所示。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

TopicPartition類只有2個屬性:topic和partition,分別代表分割槽所屬的主題和自身的分割槽編號,這個類可以和我們通常所說的主題—分割槽的概念對映起來。

有讀者會有疑問:如果我們事先並不知道主題中有多少個分割槽怎麼辦?KafkaConsumer 中的partitionsFor()方法可以用來查詢指定主題的元資料資訊,partitionsFor()方法的具體定義如

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

其中PartitionInfo型別即為主題的分割槽元資料資訊,此類的主要結構如下:

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

PartitionInfo類中的屬性

topic:表示主題名稱,

partition:代表分割槽編號,

leader:代表分割槽的leader副本所在的位置,

replicas:代表分割槽的AR集合,

inSyncReplicas:代表分割槽的ISR集合,

offineReplicas:代表分割槽的OSR集合。

透過 subscribe()方法訂閱主題具有消費者自動再均衡的功能

,在多個消費者的情況下可以根據分割槽分配策略來自動分配各個消費者與分割槽的關係。當消費組內的消費者增加或減少時,分割槽分配關係會自動調整,以實現消費負載均衡及故障自動轉移。

而透過assign()方法訂閱分割槽時,是不具備消費者自動均衡的功能的

,其實這一點從assign()方法的引數中就可以看出端倪,兩種型別的subscribe()都有ConsumerRebalanceListener型別引數的方法,而assign()方法卻沒有。

3。2。3 反序列化

參考資料

3。2。4 訊息消費

Kafka中的消費是基於拉模式的。訊息的消費一般有兩種模式:推模式和拉模式。推模式是服務端主動將訊息推送給消費者,而拉模式是消費者主動向服務端發起請求來拉取訊息。

從程式碼清單3-1中可以看出,Kafka中的訊息消費是一個不斷輪詢的過程,消費者所要做的就是重複地呼叫poll()方法,而poll()方法返回的是所訂閱的主題(分割槽)上的一組訊息。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

對於poll()方法而言,如果某些分割槽中沒有可供消費的訊息,那麼此分割槽對應的訊息拉取的結果就為空;如果訂閱的所有分割槽中都沒有可供消費的訊息,那麼poll()方法返回為空的訊息集合。

timeout的設定取決於應用程式對響應速度的要求,比如需要在多長時間內將控制權移交給執行輪詢的應用執行緒。可以直接將timeout設定為0,這樣poll()方法會立刻返回,而不管是否已經拉取到了訊息。如果應用執行緒唯一的工作就是從Kafka中拉取並消費訊息,則可以將這個引數設定為最大值Long。MAX_VALUE。

消費者消費到的每條訊息的型別為ConsumerRecord(注意與ConsumerRecords的區別),這個和生產者傳送的訊息型別ProducerRecord相對應,不過ConsumerRecord中的內容更加豐富,具體的結構參考如下程式碼:

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

topic 和 partition 這兩個欄位分別代表訊息所屬主題的名稱和所在分割槽的編號。

offset 表示訊息在所屬分割槽的偏移量。

timestamp 表示時間戳,與此對應的timestampType 表示時間戳的型別。

timestampType 有兩種型別:CreateTime 和LogAppendTime,分別代表訊息建立的時間戳和訊息追加到日誌的時間戳。

headers表示訊息的頭部內容。

key 和 value 分別表示訊息的鍵和訊息的值,一般業務應用要讀取的就是value,比如使用2。1。3節中的CompanySerializer序列化了一個Company物件,然後將其存入Kafka,那麼消費到的訊息中的 value 就是經過 CompanyDeserializer 反序列化後的 Company物件。serializedKeySize和serializedValueSize分別表示key和value經過序列化之後的大小,如果key為空,則serializedKeySize值為-1。同樣,如果value為空,則serializedValueSize的值也會為-1。

3。2。5 位移提交

對於Kafka中的分割槽而言,它的每條訊息都有唯一的offset,用來表示訊息在分割槽中對應的位置。對於消費者而言,它也有一個offset的概念,消費者使用offset來表示消費到分割槽中某個訊息所在的位置。單詞“offset”可以翻譯為“偏移量”,也可以翻譯為“位移”,讀者可能並沒有過多地在意這一點:在很多中文資料中都會交叉使用“偏移量”和“位移”這兩個詞,並沒有很嚴謹地進行區分。筆者對offset做了一些區分:對於訊息在分割槽中的位置,我們將offset稱為“偏移量”;對於消費者消費到的位置,將 offset 稱為“位移”,有時候也會更明確地稱之為“消費位移”。做這一區分的目的是讓讀者在遇到 offset 的時候可以很容易甄別出是在講分割槽儲存層面的內容,還是在講消費層面的內容,如此也可以使“偏移量”和“位移”這兩個中文詞彙具備更加豐富的意義。當然,對於一條訊息而言,它的偏移量和消費者消費它時的消費位移是相等的,在某些不需要具體劃分的場景下也可以用“訊息位置”或直接用“offset”這個單詞來進行表述。

在每次呼叫poll()方法時,它返回的是還沒有被消費過的訊息集(當然這個前提是訊息已經儲存在Kafka 中了,並且暫不考慮異常情況的發生),要做到這一點,就需要記錄上一次消費時的消費位移。並且這個消費位移必須做持久化儲存,而不是單單儲存在記憶體中,否則消費者重啟之後就無法知曉之前的消費位移。再考慮一種情況,當有新的消費者加入時,那麼必然會有再均衡的動作,對於同一分割槽而言,它可能在再均衡動作之後分配給新的消費者,如果不持久化儲存消費位移,那麼這個新的消費者也無法知曉之前的消費位移。

在舊消費者客戶端中,消費位移是儲存在ZooKeeper中的。而在新消費者客戶端中,消費位移儲存在Kafka內部的主題__consumer_offsets中。這裡把將消費位移儲存起來(持久化)的動作稱為“提交”,消費者在消費完訊息之後需要執行消費位移的提交。

參考圖3-6的消費位移,x表示某一次拉取操作中此分割槽訊息的最大偏移量,假設當前消費者已經消費了 x 位置的訊息,那麼我們就可以說消費者的消費位移為 x,圖中也用了lastConsumedOffset這個單詞來標識它。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-6 消費位移

不過需要非常明確的是,當前消費者需要提交的消費位移並不是 x,而是 x+1,對應於圖3-6中的position,它表示下一條需要拉取的訊息的位置。讀者可能看過一些相關資料,裡面所講述的內容可能是提交的消費位移就是當前所消費到的消費位移,即提交的是 x,這明顯是錯誤的。類似的錯誤還體現在對LEO(Log End Offset)的解讀上,與此相關的細節可以參閱第5章的內容。在消費者中還有一個committed offset的概念,它表示已經提交過的消費位移。

為了論證lastConsumedOffset、committed offset和position之間的關係,我們使用上面的這兩個方法來做相關演示。我們向某個主題中分割槽編號為0的分割槽傳送若干訊息,之後再建立一個消費者去消費其中的訊息,等待消費完這些訊息之後就同步提交消費位移(呼叫commitSync()方法,這個方法的細節在下面詳細介紹),最後我們觀察一下lastConsumedOffset、committed offset和position的值。示例程式碼如程式碼清單3-2所示。

3。2。6 控制或關閉消費

KafkaConsumer 提供了對消費速度進行控制的方法,在有些應用場景下我們可能需要暫停某些分割槽的消費而先消費其他分割槽,當達到一定條件時再恢復這些分割槽的消費。KafkaConsumer中使用pause()和resume()方法來分別實現暫停某些分割槽在拉取操作時返回資料給客戶端和恢復某些分割槽向客戶端返回資料的操作

參考資料《深入理解Kafka:核心設計與實踐原理》

3。2。7 指定位移消費

在3。2。5節中我們講述瞭如何進行消費位移的提交,正是有了消費位移的持久化,才使消費者在關閉、崩潰或者在遇到再均衡的時候,可以讓接替的消費者能夠根據儲存的消費位移繼續進行消費。

3。2。8 再均衡

再均衡是指分割槽的所屬權從一個消費者轉移到另一消費者的行為,它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費組內的消費者或往消費組內新增消費者

。不過在再均衡發生期間,

消費組內的消費者是無法讀取訊息

的。也就是說,在再均衡發生期間的這一小段時間內,消費組會變得不可用。另外,當一個分割槽被重新分配給另一個消費者時,消費者當前的狀態也會丟失。比如消費者消費完某個分割槽中的一部分訊息時還沒有來得及提交消費位移就發生了再均衡操作,之後這個分割槽又被分配給了消費組內的另一個消費者,原來被消費完的那部分訊息又被重新消費一遍,也就是發生了

重複消費

。一般情況下,應儘量

避免不必要的再均衡的發生

參考資料《深入理解Kafka:核心設計與實踐原理》

3。2。9 消費者攔截器

2。1。5節中講述了生產者攔截器的使用,對應的消費者也有相應的攔截器的概念。消費者攔截器主要在消費到訊息或在提交消費位移時進行一些定製化的操作。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

KafkaConsumer會在poll()方法返回之前呼叫攔截器的onConsume()方法來對訊息進行相應的定製化操作,比如

修改返回的訊息內容、按照某種規則過濾訊息

(可能會減少poll()方法返回的訊息的個數)。

如果 onConsume()方法中丟擲異常,那麼會被捕獲並記錄到日誌中,但是異常不會再向上傳遞。KafkaConsumer會在提交完消費位移之後呼叫攔截器的onCommit()方法,

可以使用這個方法來記錄跟蹤所提交的位移資訊

,比如當消費者使用commitSync的無參方法時,我們不知道提交的消費位移的具體細節,而使用攔截器的onCommit()方法卻可以做到這一點。

close()方法和ConsumerInterceptor的父介面中的configure()方法與生產者的ProducerInterceptor介面中的用途一樣,這裡就不贅述了。

在某些業務場景中會對訊息設定一個

有效期的屬性

,如果某條訊息在既定的時間視窗內無法到達,那麼就會被視為無效,它也就不需要再被繼續處理了。

下面使用消費者攔截器來實現一個簡單的訊息TTL(Time to Live,即過期時間)的功能。

在程式碼清單3-10中,自定義的消費者攔截器ConsumerInterceptorTTL使用訊息的 timestamp 欄位來判定是否過期,如果訊息的時間戳與當前的時間戳相差超過10秒則判定為過期,那麼這條訊息也就被過濾而不投遞給具體的消費者。

參考資料《深入理解Kafka:核心設計與實踐原理》

3。2。10 多執行緒實現

KafkaProducer是執行緒安全的,然而KafkaConsumer卻是非執行緒安全的。KafkaConsumer中定義了一個 acquire()方法,用來檢測當前是否只有一個執行緒在操作,若有其他執行緒正在操作則會丟擲ConcurrentModifcationException異常

KafkaConsumer 非執行緒安全並不意味著我們在消費訊息的時候只能以單執行緒的方式執行。

如果生產者傳送訊息的速度大於消費者處理訊息的速度,那麼就會有越來越多的訊息得不到及時的消費,造成了一定的延遲

。除此之外,由於Kafka 中訊息保留機制的作用,有些訊息有可能在被消費之前就被清理了,從而造成訊息的丟失。

我們可以透過多執行緒的方式來實現訊息消費,多執行緒的目的就是為了提高整體的消費能力

。多執行緒的實現方式有多種,第一種也是最常見的方式:

執行緒封閉,即為每個執行緒例項化一個KafkaConsumer物件

,如圖3-10所示。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-10 一個執行緒對應一個KafkaConsumer例項

一個執行緒對應一個KafkaConsumer例項

,我們可以稱之為消費執行緒。一個消費執行緒可以消費一個或多個分割槽中的訊息,所有的消費執行緒都隸屬於同一個消費組。這種實現方式的併發度受限於分割槽的實際個數,

根據 3.1 節中介紹的消費者與分割槽數的關係,當消費執行緒的個數大於分割槽數時,就有部分消費執行緒一直處於空閒的狀態

與此對應的第二種方式是多個消費執行緒同時消費同一個分割槽,這個透過 assign()、seek()等方法實現

,這樣可以打破原有的消費執行緒的個數不能超過分割槽數的限制,進一步提高了消費的能力。不過這種實現方式對於位移提交和順序控制的處理就會變得非常複雜,實際應用中使用得極少,筆者也並不推薦。

一般而言,分割槽是消費執行緒的最小劃分單位

參考程式碼清單 3-11 中的第①行,如果這裡對訊息的處理非常迅速,那麼 poll()拉取的頻次也會更高,進而整體消費的效能也會提升;相反,如果在這裡對訊息的處理緩慢,比如進行一個事務性操作,或者等待一個RPC的同步響應,那麼poll()拉取的頻次也會隨之下降,進而造成整體消費效能的下降。一般而言,poll()拉取訊息的速度是相當快的,而整體消費的瓶頸也正是在處理訊息這一塊,如果我們透過一定的方式來改進這一部分,那麼我們就能帶動整體消費效能的提升。參考圖3-11,考慮第三種實現方式,將處理訊息模組改成多執行緒的實現方式,具體實現如程式碼清單3-12所示。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-11 第三種多執行緒消費實現方式

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-12 帶有具體位移提交的第三種實現方式

讀者可以細想一下這樣實現是否萬無一失?其實這種位移提交的方式會有資料丟失的風險。對於同一個分割槽中的訊息,假設一個處理執行緒RecordHandler1正在處理offset為0~99的訊息,而另一個處理執行緒RecordHandler2已經處理完了offset為100~199的訊息並進行了位移提交,此時如果RecordHandler1發生異常,則之後的消費只能從200開始而無法再次消費0~99的訊息,從而造成了訊息丟失的現象。這裡雖然針對位移覆蓋做了一定的處理,但還沒有解決異常情況下的位移覆蓋問題。對此就要引入更加複雜的處理機制,這裡再提供一種解決思路,參考圖3-13,

總體結構上是基於滑動視窗實現的

。對於第三種實現方式而言,它所呈現的結構是透過消費者拉取分批次的訊息,然後提交給多執行緒進行處理,

而這裡的滑動視窗式的實現方式是將拉取到的訊息暫存起來,多個消費執行緒可以拉取暫存的訊息

,這個用於暫存訊息的快取大小即為滑動視窗的大小,總體上而言沒有太多的變化,不同的是對於消費位移的把控。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

圖3-13 滑動視窗式多執行緒消費實現方式

如圖3-13所示,每一個方格代表一個批次的訊息,一個滑動視窗包含若干方格,

startOffset標註的是當前滑動視窗的起始位置,endOffset標註的是末尾位置

。每當startOffset指向的方格中的訊息被消費完成,就可以提交這部分的位移,與此同時,視窗向前滑動一格,刪除原來startOffset所指方格中對應的訊息,並且拉取新的訊息進入視窗。

滑動視窗的大小固定

,所對應的用來暫存訊息的快取大小也就固定了,這部分記憶體開銷可控。

方格大小和滑動視窗的大小同時決定了消費執行緒的併發數

:一個方格對應一個消費執行緒,對於視窗大小固定的情況,方格越小並行度越高;對於方格大小固定的情況,視窗越大並行度越高。不過,若視窗設定得過大,不僅會增大記憶體的開銷,而且在發生異常(比如Crash)的情況下也會引起大量的重複消費,同時還考慮執行緒切換的開銷,建議根據實際情況設定一個合理的值,不管是對於方格還是視窗而言,過大或過小都不合適。

如果一個方格內的訊息無法被標記為消費完成,那麼就會造成 startOffset 的懸停。為了使視窗能夠繼續向前滑動,那麼就需要設定一個閾值,當 startOffset 懸停一定的時間後就對這部分訊息進行本地重試消費,如果重試失敗就轉入重試佇列,如果還不奏效就轉入死信佇列,有關Kafka中重試佇列和死信佇列的實現可以參考11。3節。真實應用中無法消費的情況極少,一般是由業務程式碼的處理邏輯引起的,比如訊息中的內容格式與業務處理的內容格式不符,無法對這條訊息進行決斷,這種情況可以透過最佳化程式碼邏輯或採取丟棄策略來避免。如果需要訊息高度可靠,也可以將無法進行業務邏輯的訊息(這類訊息可以稱為死信)存入磁碟、資料庫或Kafka,然後繼續消費下一條訊息以保證整體消費進度合理推進,之後可以透過一個額外的處理任務來分析死信進而找出異常的原因。

3。2。11 重要的消費者引數

在KafkaConsumer中,除了3。2。1節提及的4個預設的客戶端引數,大部分的引數都有合理的預設值,一般我們也不需要去修改它們。不過了解這些引數可以讓我們更好地使用消費者客戶端,其中還有一些重要的引數涉及程式的可用性和效能,如果能夠熟練掌握它們,也可以讓我們在編寫相關的程式時能夠更好地進行效能調優與故障排查。下面挑選一些重要的引數來做細緻的講解。

1.fetch.min.bytes

該引數用來配置Consumer在一次拉取請求(呼叫poll()方法)中能從Kafka中拉取的最小資料量,預設值為1(B)。Kafka在收到Consumer的拉取請求時,如果返回給Consumer的資料量小於這個引數所配置的值,那麼它就需要進行等待,直到資料量滿足這個引數的配置大小。可以適當調大這個引數的值以提高一定的吞吐量,不過也會造成額外的延遲(latency),對於延遲敏感的應用可能就不可取了。

2.fetch.max.bytes

該引數與fetch。max。bytes引數對應,它用來配置Consumer在一次拉取請求中從Kafka中拉取的最大資料量,預設值為 52428800(B),也就是 50MB。如果這個引數設定的值比任何一條寫入Kafka中的訊息要小,那麼會不會造成無法消費呢?很多資料對此引數的解讀認為是無法消費的,比如一條訊息的大小為10B,而這個引數的值是1(B),既然此引數設定的值是一次拉取請求中所能拉取的最大資料量,那麼顯然 1B<10B,所以無法拉取。這個觀點是錯誤的,該引數設定的不是絕對的最大值,如果在第一個非空分割槽中拉取的第一條訊息大於該值,那麼該訊息將仍然返回,以確保消費者繼續工作。也就是說,上面問題的答案是可以正常消費。與此相關的,Kafka中所能接收的最大訊息的大小透過服務端引數message。max。bytes(對應於主題端引數max。message。bytes)來設定。

3.fetch.max.wait.ms

這個引數也和fetch。min。bytes引數有關,如果Kafka僅僅參考fetch。min。bytes引數的要求,那麼有可能會一直阻塞等待而無法傳送響應給 Consumer,顯然這是不合理的。fetch。max。wait。ms引數用於指定Kafka的等待時間,預設值為500(ms)。如果Kafka中沒有足夠多的訊息而滿足不了fetch。min。bytes引數的要求,那麼最終會等待500ms。這個引數的設定和Consumer與Kafka之間的延遲也有關係,如果業務應用對延遲敏感,那麼可以適當調小這個引數。

4.max.partition.fetch.bytes

這個引數用來配置從每個分割槽裡返回給Consumer的最大資料量,預設值為1048576(B),即1MB。這個引數與 fetch。max。bytes 引數相似,只不過前者用來限制一次拉取中每個分割槽的訊息大小,而後者用來限制一次拉取中整體訊息的大小。同樣,如果這個引數設定的值比訊息的大小要小,那麼也不會造成無法消費,Kafka 為了保持消費邏輯的正常運轉不會對此做強硬的限制。

5.max.poll.records

這個引數用來配置Consumer在一次拉取請求中拉取的最大訊息數,預設值為500(條)。如果訊息的大小都比較小,則可以適當調大這個引數值來提升一定的消費速度。

6.connections.max.idle.ms

這個引數用來指定在多久之後關閉限制的連線,預設值是540000(ms),即9分鐘。

7.exclude.internal.topics

Kafka中有兩個內部的主題:__consumer_offsets和__transaction_state。exclude。internal。topics用來指定Kafka中的內部主題是否可以向消費者公開,預設值為true。如果設定為true,那麼只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式來訂閱內部主題,設定為false則沒有這個限制。

8.receive.buffer.bytes

這個引數用來設定Socket接收訊息緩衝區(SO_RECBUF)的大小,預設值為65536(B),即64KB。如果設定為-1,則使用作業系統的預設值。如果Consumer與Kafka處於不同的機房,則可以適當調大這個引數值。

9.send.buffer.bytes

這個引數用來設定Socket傳送訊息緩衝區(SO_SNDBUF)的大小,預設值為131072(B),即128KB。與receive。buffer。bytes引數一樣,如果設定為-1,則使用作業系統的預設值。

10.request.timeout.ms

這個引數用來配置Consumer等待請求響應的最長時間,預設值為30000(ms)。

11.metadata.max.age.ms

這個引數用來配置元資料的過期時間,預設值為300000(ms),即5分鐘。如果元資料在此引數所限定的時間範圍內沒有進行更新,則會被強制更新,即使沒有任何分割槽變化或有新的broker加入。

12.reconnect.backoff.ms

這個引數用來配置嘗試重新連線指定主機之前的等待時間(也稱為退避時間),避免頻繁地連線主機,預設值為50(ms)。這種機制適用於消費者向broker傳送的所有請求。

13.retry.backoff.ms

這個引數用來配置嘗試重新發送失敗的請求到指定的主題分割槽之前的等待(退避)時間,避免在某些故障情況下頻繁地重複傳送,預設值為100(ms)。

14.isolation.level

這個引數用來配置消費者的事務隔離級別。字串型別,有效值為“read_uncommitted”和“read_committed”,表示消費者所消費到的位置,如果設定為“read_committed”,那麼消費者就會忽略事務未提交的訊息,即只能消費到 LSO(LastStableOffset)的位置,預設情況下為“read_uncommitted”,即可以消費到HW(High Watermark)處的位置。有關事務和LSO的內容可以分別參考7。4節和10。2節。

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

「深入理解Kafka」核心設計與實踐(二)消費者概念與實戰

3。3 總結

本章主要講述了消費者和消費組的概念,以及如何正確使用 KafkaConsumer。具體的內容還包括引數配置的講解、訂閱、反序列化、位移提交、再均衡、消費者攔截器、多執行緒的使用。不過本章並沒有同前一章的生產者一樣講述具體的原理,因為考慮到KafkaConsumer內部實現相對複雜,具體的原理講述篇幅較長,故將相關的內容編排到第7章中,如果讀者迫切地想要了解這部分的知識,可以直接跳轉到第7章進行閱讀。

參考資料

[springboot之整合kafka](https://www。jianshu。com/p/6ce5d9a96113)

[深入理解Kafka:核心設計與實踐原理](

https://weread。qq。com/web/reader/e9a32a0071848698e9a39b8kc81322c012c81e728d9d180