「MQ」KAFKA 零複製

明天的你會感謝今天努力的你

舉手之勞,加個關注

一、前言

我們都知道 Kafka 是基於磁碟進行儲存的,但 Kafka 官方又稱其具有高效能、高吞吐、低延時的特點,其吞吐量動輒幾十上百萬。小夥伴們是不是有點困惑了,一般認為在磁碟上讀寫資料是會降低效能的,因為定址會比較消耗時間。那 Kafka 又是怎麼做到其吞吐量動輒幾十上百萬的呢?

Kafka 高效能,是多方面協同的結果,包括宏觀架構、分散式 partition 儲存、ISR 資料同步、以及“無所不用其極”的高效利用磁碟、作業系統特性。

別急,下面老周從資料的寫入與讀取兩個維度來帶大家一探究竟。

二、順序寫入

磁碟讀寫有兩種方式:順序讀寫或者隨機讀寫。在順序讀寫的情況下,磁碟的順序讀寫速度和記憶體持平。

因為磁碟是機械結構,每次讀寫都會定址->寫入,其中定址是一個“機械動作”。為了提高讀寫磁碟的速度,Kafka 就是使用順序 I/O。

「MQ」KAFKA 零複製

Kafka 利用了一種分段式的、只追加 (Append-Only) 的日誌,基本上把自身的讀寫操作限制為順序 I/O,也就使得它在各種儲存介質上能有很快的速度。一直以來,有一種廣泛的誤解認為磁碟很慢。實際上,儲存介質 (特別是旋轉式的機械硬碟) 的效能很大程度依賴於訪問模式。在一個 7200 轉/分鐘的 SATA 機械硬碟上,隨機 I/O 的效能比順序 I/O 低了大概 3 到 4 個數量級。此外,一般來說現代的作業系統都會提供預讀和延遲寫技術:以大資料塊的倍數預先載入資料,以及合併多個小的邏輯寫操作成一個大的物理寫操作。正因為如此,順序 I/O 和隨機 I/O 之間的效能差距在 flash 和其他固態非易失性儲存介質中仍然很明顯,儘管它遠沒有旋轉式的儲存介質那麼明顯。

這裡給出著名學術期刊 ACM Queue 上的效能對比圖:

「MQ」KAFKA 零複製

下圖就展示了 Kafka 是如何寫入資料的, 每一個 Partition 其實都是一個檔案 ,收到訊息後 Kafka 會把資料插入到檔案末尾(虛框部分):

「MQ」KAFKA 零複製

這種方法採用了只讀設計 ,所以 Kafka 是不會修改、刪除資料的,它會把所有的資料都保留下來,每個消費者(Consumer)對每個 Topic 都有一個 offset 用來表示讀取到了第幾條資料 。

「MQ」KAFKA 零複製

磁碟的順序讀寫是磁碟使用模式中最有規律的,並且作業系統也對這種模式做了大量最佳化,Kafka 就是使用了磁碟順序讀寫來提升的效能。Kafka 的 message 是不斷追加到本地磁碟檔案末尾的,而不是隨機的寫入,這使得 Kafka 寫入吞吐量得到了顯著提升。

三、頁快取

即便是順序寫入硬碟,硬碟的訪問速度還是不可能追上記憶體。所以 Kafka 的資料並不是實時的寫入硬碟 ,它充分利用了現代作業系統分頁儲存來利用記憶體提高 I/O 效率。具體來說,就是把磁碟中的資料快取到記憶體中,把對磁碟的訪問變為對記憶體的訪問。

Kafka 接收來自 socket buffer 的網路資料,應用程序不需要中間處理、直接進行持久化時。可以使用mmap 記憶體檔案對映。

3.1 Memory Mapped Files

簡稱

mmap

,簡單描述其作用就是:

將磁碟檔案對映到記憶體,使用者透過修改記憶體就能修改磁碟檔案

它的工作原理是直接利用作業系統的 Page 來實現磁碟檔案到物理記憶體的直接對映。完成對映之後你對物理記憶體的操作會被同步到硬碟上(作業系統在適當的時候)。

「MQ」KAFKA 零複製

透過 mmap,程序像讀寫硬碟一樣讀寫記憶體(當然是虛擬機器記憶體)。使用這種方式可以獲取很大的 I/O 提升,省去了使用者空間到核心空間複製的開銷。

mmap 也有一個很明顯的缺陷:不可靠,寫到 mmap 中的資料並沒有被真正的寫到硬碟,作業系統會在程式主動呼叫 flush 的時候才把資料真正的寫到硬碟。

Kafka 提供了一個引數 producer。type 來控制是不是主動 flush:

如果 Kafka 寫入到 mmap 之後就立即 flush,然後再返回 Producer 叫同步(sync);

寫入 mmap 之後立即返回 Producer 不呼叫 flush 叫非同步(async)。

3.2 Java NIO 對檔案對映的支援

Java NIO,提供了一個 MappedByteBuffer 類可以用來實現記憶體對映。

MappedByteBuffer 只能透過呼叫 FileChannel 的 map() 取得,再沒有其他方式。

FileChannel。map() 是抽象方法,具體實現是在 FileChannelImpl。map() 可自行檢視 JDK 原始碼,其 map0() 方法就是呼叫了 Linux 核心的 mmap 的 API。

「MQ」KAFKA 零複製

「MQ」KAFKA 零複製

「MQ」KAFKA 零複製

3.3 使用 MappedByteBuffer 類注意事項

mmap 的檔案對映,在 full gc 時才會進行釋放。當 close 時,需要手動清除記憶體對映檔案,可以反射呼叫 sun。misc。Cleaner 方法。

當一個程序準備讀取磁碟上的檔案內容時:

作業系統會先檢視待讀取的資料所在的頁(page)是否在頁快取(pagecache)中,如果存在(命中) 則直接返回資料,從而避免了對物理磁碟的 I/O 操作;

如果沒有命中,則作業系統會向磁碟發起讀取請求並將讀取的資料頁存入頁快取,之後再將資料返回給程序。

如果一個程序需要將資料寫入磁碟:

作業系統也會檢測資料對應的頁是否在頁快取中,如果不存在,則會先在頁快取中新增相應的頁,最後將資料寫入對應的頁。

被修改過後的頁也就變成了髒頁,作業系統會在合適的時間把髒頁中的資料寫入磁碟,以保持資料的一致性。

對一個程序而言,它會在程序內部快取處理所需的資料,然而這些資料有可能還快取在作業系統的頁快取中,因此同一份資料有可能被快取了兩次。並且,除非使用 Direct I/O 的方式, 否則頁快取很難被禁止。

當使用頁快取的時候,即使 Kafka 服務重啟, 頁快取還是會保持有效,然而程序內的快取卻需要重建。這樣也極大地簡化了程式碼邏輯,因為維護頁快取和檔案之間的一致性交由作業系統來負責,這樣會比程序內維護更加安全有效。

Kafka 中大量使用了頁快取,這是 Kafka 實現高吞吐的重要因素之一。

訊息先被寫入頁快取,由作業系統負責刷盤任務。

四、零複製

導致應用程式效率低下的一個典型根源是緩衝區之間的位元組資料複製。Kafka 使用由 Producer、Broker 和 Consumer 多方共享的二進位制訊息格式,因此資料塊即便是處於壓縮狀態也可以在不被修改的情況下在端到端之間流動。雖然消除通訊各方之間的結構化差異是非常重要的一步,但它本身並不能避免資料的複製。

Kafka 透過利用 Java 的 NIO 框架,尤其是

java。nio。channels。FileChannel

裡的

transferTo

這個方法,解決了前面提到的在 Linux 等類 UNIX 系統上的資料複製問題。此方法能夠在不借助作為傳輸中介的應用程式的情況下,將位元組資料從源通道直接傳輸到接收通道。要了解 NIO 的帶來的改進,請考慮傳統方式下作為兩個單獨的操作:源通道中的資料被讀入位元組緩衝區,接著寫入接收通道:

File。read(fileDesc, buf, len);Socket。send(socket, buf, len);

透過圖表來說明,這個過程可以被描述如下:

「MQ」KAFKA 零複製

儘管上面的過程看起來已經足夠簡單,但是在內部仍需要 4 次使用者態和核心態的上下文切換來完成複製操作,而且需要複製 4 次資料才能完成這個操作。下面的示意圖概述了每一個步驟中的上下文切換。

「MQ」KAFKA 零複製

讓我們來更詳細地看一下細節:

初始的 read() 呼叫導致了一次使用者態到核心態的上下文切換。DMA (Direct Memory Access 直接記憶體訪問) 引擎讀取檔案,並將其內容複製到核心地址空間中的緩衝區中。這個緩衝區和上面的程式碼片段裡使用的並非同一個。

在從 read() 返回之前,核心緩衝區的資料會被複製到使用者態的緩衝區。此時,我們的程式可以讀取檔案的內容。

接下來的 send() 方法會切換回核心態,複製使用者態的緩衝區資料到核心地址空間 —— 這一次是複製到一個關聯著目標套接字的不同緩衝區。在後臺,DMA 引擎會接手這一操作,非同步地把資料從核心緩衝區複製到協議堆疊,由網絡卡進行網路傳輸。 send() 方法在返回之前不等待此操作。

send() 呼叫返回,切換回使用者態。

儘管模式切換的效率很低,而且需要進行額外的複製,但在許多情況下,中間核心緩衝區的效能實際上可以進一步提高。比如它可以作為一個預讀快取,非同步預載入資料塊,從而可以在應用程式前端執行請求。但是,當請求的資料量極大地超過核心緩衝區大小時,核心緩衝區就會成為效能瓶頸。它不會直接複製資料,而是迫使系統在使用者態和核心態之間搖擺,直到所有資料都被傳輸完成。

相比之下,零複製方式能在單個操作中處理完成。前面示例中的程式碼片段現在能重寫為一行程式:

fileDesc。transferTo(offset, len, socket);

零複製方式可以用下圖來說明:

「MQ」KAFKA 零複製

在這種模式下,上下文的切換次數被縮減至一次。具體來說, transferTo() 方法指示資料塊裝置透過 DMA 引擎將資料讀入讀緩衝區,然後這個緩衝區的資料複製到另一個核心緩衝區中,分階段寫入套接字。最後,DMA 將套接字緩衝區的資料複製到 NIC 緩衝區中。

「MQ」KAFKA 零複製

最終結果,我們已經把複製的次數從 4 降到了 3,而且其中只有一次複製佔用了 CPU 資源。我們也已經把上下文切換的次數從 4 降到了 2。

把磁碟檔案讀取 OS 核心緩衝區後的 fileChannel,直接轉給 socketChannel 傳送;底層就是 sendfile。消費者從 broker 讀取資料,就是由此實現。

具體來看,Kafka 的資料傳輸透過 TransportLayer 來完成,其子類 PlaintextTransportLayer 透過 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實現零複製。

「MQ」KAFKA 零複製

注:transferTo 和 transferFrom 並不保證一定能使用零複製,需要作業系統支援。

這是一個巨大的提升,不過還沒有實現完全 “零複製”。我們可以透過利用 Linux 核心 2。4 或更高版本以及支援 gather 操作的網絡卡來做進一步的最佳化從而實現真正的 “零複製”。下面的示意圖可以說明:

「MQ」KAFKA 零複製

呼叫 transferTo() 方法會致使裝置透過 DMA 引擎將資料讀入核心讀緩衝區,就像前面的例子那樣。然而,透過 gather 操作,讀緩衝區和套接字緩衝區之間的資料複製將不復存在。相反地,NIC 被賦予一個指向讀緩衝區的指標,連同偏移量和長度,所有資料都將透過 DMA 抽取乾淨並複製到 NIC 緩衝區。在這個過程中,在緩衝區間複製資料將無需佔用任何 CPU 資源。

傳統的方式和零複製方式在 MB 位元組到 GB 位元組的檔案大小範圍內的效能對比顯示,零複製方式相較於傳統方式的效能提升幅度在 2 到 3 倍。但更令人驚歎的是,Kafka 僅僅是在一個純 JVM 虛擬機器下、沒有使用本地庫或 JNI 程式碼,就實現了這一點。

五、Broker 效能

5.1 日誌記錄批處理

順序 I/O 在大多數的儲存介質上都非常快,幾乎可以和網路 I/O 的峰值效能相媲美。在實踐中,這意味著一個設計良好的日誌結構的持久層將可以緊隨網路流量的速度。事實上,Kafka 的瓶頸通常是網路而非磁碟。因此,除了由作業系統提供的底層批處理能力之外,Kafka 的 Clients 和 Brokers 會把多條讀寫的日誌記錄合併成一個批次,然後才透過網路傳送出去。日誌記錄的批處理透過使用更大的包以及提高頻寬效率來攤薄網路往返的開銷。

5.2 批次壓縮

當啟用壓縮功能時,批處理的影響尤為明顯,因為壓縮效率通常會隨著資料量大小的增加而變得更高。特別是當使用 JSON 等基於文字的資料格式時,壓縮效果會非常顯著,壓縮比通常能達到 5 到 7 倍。此外,日誌記錄批處理在很大程度上是作為 Client 側的操作完成的,此舉把負載轉移到 Client 上,不僅對網路頻寬效率、而且對 Brokers 的磁碟 I/O 利用率也有很大的提升。

5.3 非強制重新整理緩衝寫操作

另一個助力 Kafka 高效能、同時也是一個值得更進一步去探究的底層原因:Kafka 在確認寫成功 ACK 之前的磁碟寫操作不會真正呼叫 fsync 命令;通常只需要確保日誌記錄被寫入到 I/O Buffer 裡就可以給 Client 回覆 ACK 訊號。這是一個鮮為人知卻至關重要的事實:事實上,這正是讓 Kafka 能表現得如同一個記憶體型訊息佇列的原因 —— 因為 Kafka 是一個基於磁碟的記憶體型訊息佇列 (受緩衝區/頁面快取大小的限制)。

另一方面,這種形式的寫入是不安全的,因為副本的寫失敗可能會導致資料丟失,即使日誌記錄似乎已經被確認成功。換句話說,與關係型資料庫不同,確認一個寫操作成功並不等同於持久化成功。真正使得 Kafka 具備持久化能力的是執行多個同步的副本的設計;即便有一個副本寫失敗了,其他的副本(假設有多個)仍然可以保持可用狀態,前提是寫失敗是不相關的(例如,多個副本由於一個共同的上游故障而同時寫失敗)。因此,不使用 fsync 的 I/O 非阻塞方法和冗餘同步副本的結合,使得 Kafka 同時具備了高吞吐量、永續性和可用性。

六、流資料並行

日誌結構 I/O 的效率是影響效能的一個關鍵因素,主要影響寫操作;Kafka 在對 Topic 結構和 Consumer 群組的並行處理是其讀效能的基礎。這種組合產生了非常高的端到端訊息傳遞總體吞吐量。併發性根深蒂固地存在於 Kafka 的分割槽方案和 Consumer Groups 的操作中,這是 Kafka 中一種有效的負載均衡機制 —— 把資料分割槽 (Partition) 近似均勻地分配給組內的各個 Consumer 例項。將此與更傳統的 MQ 進行比較:在 RabbitMQ 的等效設定中,多個併發的 Consumers 可能以輪詢的方式從佇列讀取資料,然而這樣做,就會失去訊息消費的順序性。

分割槽機制也使得 Kafka Brokers 可以水平擴充套件。每個分割槽都有一個專門的 Leader;因此,任何重要的主題 Topic (具有多個分割槽) 都可以利用整個 Broker 叢集進行寫操作,這是 Kafka 和訊息佇列之間的另一個區別;後者利用叢集來獲得可用性,而 Kafka 將真正地在 Brokers 之間負載均衡,以獲得可用性、永續性和吞吐量。

生產者在釋出日誌記錄之時指定分割槽,假設你正在釋出訊息到一個有多個分割槽的 Topic 上。(也可能有單一分割槽的 Topic, 這種情況下將不成問題。) 這可以透過直接指定分割槽索引來完成,或者間接透過日誌記錄的鍵值來完成,該鍵值能被確定性地雜湊到一個一致的 (即每次都相同) 分割槽索引。擁有相同雜湊值的日誌記錄將會被儲存到同一個分割槽中。假設一個 Topic 有多個分割槽,那些不同雜湊值的日誌記錄將很可能最後被儲存到不同的分割槽裡。但是,由於雜湊碰撞的緣故,不同雜湊值的日誌記錄也可能最後被儲存到相同的分割槽裡。這是雜湊的本質,如果你理解雜湊表的原理,那應該是顯而易見的。

日誌記錄的實際處理是由一個在 (可選的) Consumer Group 中的 Consumer 操作完成。Kafka 確保一個分割槽最多隻能分配給它的 Consumer Group 中的一個 Consumer 。(我們說 “最多” 是因為考慮到一種全部 Consumer 都離線的情況。) 當第一個 Consumer Group 裡的 Consumer 訂閱了 Topic,它將消費這個 Topic 下的所有分割槽的資料。當第二個 Consumer 緊隨其後加入訂閱時,它將大致獲得這個 Topic 的一半分割槽,減輕第一個 Consumer 先前負荷的一半。這使得你能夠並行處理事件流,並根據需要增加 Consumer (理想情況下,使用自動伸縮機制),前提是你已經對事件流進行了合理的分割槽。

日誌記錄吞吐量的控制一般透過以下兩種方式來達成:

Topic 的分割槽方案。應該對 Topics 進行分割槽,以最大限度地增加獨立子事件流的數量。換句話說,日誌記錄的順序應該只保留在絕對必要的地方。如果任意兩個日誌記錄在某種意義上沒有合理的關聯,那它們就不應該被繫結到同一個分割槽。這暗示你要使用不同的鍵值,因為 Kafka 將使用日誌記錄的鍵值作為一個雜湊源來派生其一致的分割槽對映。

一個組裡的 Consumers 數量。你可以增加 Consumer Group 裡的 Consumer 數量來均衡入站的日誌記錄的負載,這個數量的上限是 Topic 的分割槽數量。(如果你願意的話,你當然可以增加更多的 Consumers ,不過分割槽計數將會設定一個上限來確保每一個活躍的 Consumer 至少被指派到一個分割槽,多出來的 Consumers 將會一直保持在一個空閒的狀態。) 請注意, Consumer 可以是程序或執行緒。依據 Consumer 執行的工作負載型別,你可以線上程池中使用多個獨立的 Consumer 執行緒或程序記錄。

如果你之前一直想知道 Kafka 是否很快、它是如何擁有其現如今公認的高效能標籤,或者它是否可以滿足你的使用場景,那麼相信你現在應該有了所需的答案。

為了讓事情足夠清楚,必須說明 Kafka 並不是最快的 (也就是說,具有最大吞吐量能力的) 訊息傳遞中介軟體,還有其他具有更大吞吐量的平臺 —— 有些是基於軟體的 —— 有些是在硬體中實現的。Apache Pulsar 是一項極具前景的技術,它具備可擴充套件性,在提供相同的訊息順序性和永續性保證的同時,還能實現更好的吞吐量-延遲效果。使用 Kafka 的根本原因是,它作為一個完整的生態系統仍然是無與倫比的。它展示了卓越的效能,同時提供了一個豐富和成熟而且還在不斷進化的環境,儘管 Kafka 的規模已經相當龐大了,但仍以一種令人羨慕的速度在成長。

Kafka 的設計者和維護者們在創造一個以效能導向為核心的解決方案這方面做得非常出色。它的大多數設計/理念元素都是早期就構思完成、幾乎沒有什麼是事後才想到的,也沒有什麼是附加的。從把工作負載分攤到 Client 到 Broker 上的日誌結構永續性,批處理、壓縮、零複製 I/O 和流資料級並行 —— Kafka 向幾乎所有其他面向訊息的中介軟體 (商業的或開源的) 發起了挑戰。而且最令人歎為觀止的是,它做到這些事情的同時竟然沒有犧牲掉永續性、日誌記錄順序性和至少交付一次的語義等特性。

七、總結

7.1 mmap 和 sendfile

Linux 核心提供、實現零複製的 API。

mmap 將磁碟檔案對映到記憶體,支援讀和寫,對記憶體的操作會反映在磁碟檔案上。

sendfile 是將讀到核心空間的資料,轉到 socket buffer,進行網路傳送。

RocketMQ 在消費訊息時,使用了 mmap;Kafka 使用了 sendfile。

7.2 Kafka 為啥這麼快?

Partition 順序讀寫,充分利用磁碟特性,這是基礎。

Producer 生產的資料持久化到 Broker,採用 mmap 檔案對映,實現順序的快速寫入。

Customer 從 Broker 讀取資料,採用 sendfile,將磁碟檔案讀到 OS 核心緩衝區後,直接轉到 socket buffer 進行網路傳送。

Broker 效能最佳化:日誌記錄批處理、批次壓縮、非強制重新整理緩衝寫操作等。

流資料並行

「MQ」KAFKA 零複製