大資料框架exactly-once底層原理,看這篇文章就夠了

一、大資料框架三種語義‍‍‍‍

在分散式系統中,如kafka、spark、flink等構成系統的任何節點都是被定義為可以彼此獨立失敗的。比如在 Kafka 中,broker 可能會 crash,在 producer 推送資料至 topic 的過程中也可能會遇到網路問題。根據 producer 處理此類故障所採取的提交策略型別,有如下三種(以kafka為例):

at-least-once:

如果 producer 收到來自 Kafka broker 的確認(ack)或者 acks = all,則表示該訊息已經寫入到 Kafka。但如果 producer ack 超時或收到錯誤,則可能會重試傳送訊息,客戶端會認為該訊息未寫入 Kafka。如果 broker 在傳送 Ack 之前失敗,但在訊息成功寫入 Kafka 之後,此重試將導致該訊息被寫入兩次,因此訊息會被不止一次地傳遞給最終 consumer,這種策略可能導致重複的工作和不正確的結果。

at-most-once:

如果在 ack 超時或返回錯誤時 producer 不重試,則該訊息可能最終不會寫入 Kafka,因此不會傳遞給 consumer。在大多數情況下,這樣做是為了避免重複的可能性,業務上必須接收資料傳遞可能的丟失。

exactly-once:

即使 producer 重試傳送訊息,訊息也會保證最多一次地傳遞給最終consumer。該語義是最理想的,但也難以實現,因為它需要訊息系統本身與生產和消費訊息的應用程式進行協作。

二、大資料框架故障階段(kafka為例)

理想狀況,網路良好,程式碼沒有錯誤,則 Kafka 可以保證 exactly-once,但生產環境錯綜複雜,故障幾乎無法避免,主要有:

1 框架自身故障(Broker):

Kafka 作為一個高可用、持久化系統,保證每條訊息被持久化並且冗餘多份(假設是 n 份),所以 Kafka 可以容忍 n-1 個 broker 故障,意味著一個分割槽只要至少有一個 broker 可用,分割槽就可用。Kafka 的副本協議保證了只要訊息被成功寫入了主副本,它就會被複制到其他所有的可用副本(ISR)。

2 客戶端傳送框架失敗(Producer 到 Broker 的 RPC):

Kafka 的永續性依賴於生產者接收broker 的 ack 。沒有接收成功 ack 不代表生產請求本身失敗了。broker 可能在寫入訊息後,傳送 ack 給生產者的時候掛了,甚至 broker 也可能在寫入訊息前就掛了。由於生產者沒有辦法知道錯誤是什麼造成的,所以它就只能認為訊息沒寫入成功,並且會重試傳送。在一些情況下,這會造成同樣的訊息在 Kafka 分割槽日誌中重複,進而造成消費端多次收到這條訊息。

3 客戶端也失敗(Producer):

Exactly-once delivery 也必須考慮客戶端失敗的情況。但是如何去區分客戶端是真的掛了(永久性宕機)還是說只是暫時丟失心跳?追求正確性的話,broker 應該丟棄由 zombie producer 傳送的訊息。 consumer 也是如此,一旦新的客戶端例項已經啟動,它必須能夠從失敗例項的任何狀態中恢復,並從安全點( safe checkpoint )開始處理,這意味著消費的偏移量必須始終與生成的輸出保持同步。

4 框架傳送消費端失敗(Broker到 Consumer 的 RPC)

三、Exactly-once底層實現原理

3.1、依賴業務控制

對生產者:

每個分割槽只有一個生產者寫入訊息,當出現異常或超時,生產者查詢此分割槽最後一個訊息,用於決定後續操作時重傳還是繼續傳送。

為每個訊息增加唯一主鍵,生產者不做處理,由消費者根據主鍵去重。

對消費者:

關閉自動提交 offset 的功能,不使用 Offsets Topic 這個內部 Topic 記錄其 offset,而是由消費者自動儲存 offset。將 offset 和訊息處理放在一個事務裡面,事務執行成功認為訊息被消費,否則事務回滾需要重新處理。當出現消費者重啟或者 Rebalance 操作,可以從資料庫找到對應的 offset,然後呼叫 KafkaConsumer。seek() 設定消費者位置,從此 offset 開始消費。

3.2、依賴 Kafka

3.2.1、冪等性:

每個分割槽中精確一次且有序(Idempotence: Exactly-once in order semantics per partition)

Kafka 在0。11。0。0之前的版本中只支援 At Least Once 和 At Most Once 語義,尚不支援 Exactly Once 語義。

Kafka 0。11。0。0版本引入了冪等語義。 一個冪等性的操作就是一種被執行多次造成的影響和只執行一次造成的影響一樣的操作。

如果出現導致生產者重試的錯誤,同樣的訊息,仍由同樣的生產者傳送多次,將只被寫到 Kafka broker 的日誌中一次。

對於單個分割槽,冪等生產者不會因為生產者或 broker 故障而產生多條重複訊息。

想要開啟這個特性,獲得每個分割槽內的精確一次語義,也就是說沒有重複,沒有丟失,並且有序的語義,只需要 producer 配置 enable。idempotence=true。

這個特性是怎麼實現的呢?每個新的 Producer 在初始化的時候會被分配一個唯一的 PID,該PID對使用者完全透明而不會暴露給使用者。在底層,它和 TCP 的工作原理有點像,每一批發送到 Kafka 的訊息都將包含 PID 和一個從 0 開始單調遞增序列號。

Broker 將使用這個序列號來刪除重複的傳送。和只能在瞬態記憶體中的連線中保證不重複的 TCP 不同,這個序列號被持久化到副本日誌,所以,即使分割槽的 leader 掛了,其他的 broker 接管了leader,新 leader 仍可以判斷重新發送的是否重複了。這種機制的開銷非常低:每批訊息只有幾個額外的欄位。這種特性比非冪等的生產者只增加了可忽略的效能開銷。

如果訊息序號比 Broker 維護的序號大 1 以上,說明中間有資料尚未寫入,也即亂序,此時 Broker 拒絕該訊息。

如果訊息序號小於等於 Broker 維護的序號,說明該訊息已被儲存,即為重複訊息,Broker直接丟棄該訊息。

總結來說,producer 端傳送訊息時,生成全域性唯一自增pid,和broker中資料的pid進行對比,多則刪除,少則通知producer端重新發送。

3.2.2、事務:跨分割槽原子寫入

上述冪等設計只能保證單個 Producer 對於同一個 的 Exactly Once 語義。

Kafka 現在透過新的事務 API 支援跨分割槽原子寫入。這將允許一個生產者傳送一批到不同分割槽的訊息,這些訊息要麼全部對任何一個消費者可見,要麼對任何一個消費者都不可見。這個特性也允許在一個事務中處理消費資料和提交消費偏移量,從而實現端到端的精確一次語義。

為了實現這種效果,應用程式必須提供一個穩定的(重啟後不變)唯一的 ID,也即Transaction ID 。 Transactin ID 與 PID 可能一一對應。區別在於 Transaction ID 由使用者提供,將生產者的 transactional。id 配置項設定為某個唯一ID。而 PID 是內部的實現對使用者透明。

另外,為了保證新的 Producer 啟動後,舊的具有相同 Transaction ID 的 Producer 失效,每次 Producer 透過 Transaction ID 拿到 PID 的同時,還會獲取一個單調遞增的 epoch。由於舊的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易識別出該 Producer 是老的 Producer 並拒絕其請求。

有了Transaction ID後,Kafka可保證:

跨Session的資料冪等傳送。當具有相同Transaction ID的新的Producer例項被建立且工作時,舊的且擁有相同Transaction ID的Producer將不再工作。

跨Session的事務恢復。如果某個應用例項宕機,新的例項可以保證任何未完成的舊的事務要麼Commit要麼Abort,使得新例項從一個正常狀態開始工作。

需要注意的是,上述的事務保證是從Producer的角度去考慮的。從Consumer的角度來看,該保證會相對弱一些。尤其是不能保證所有被某事務Commit過的所有訊息都被一起消費,因為:

對於壓縮的Topic而言,同一事務的某些訊息可能被其它版本覆蓋

事務包含的訊息可能分佈在多個Segment中(即使在同一個Partition內),當老的Segment被刪除時,該事務的部分資料可能會丟失

Consumer在一個事務內可能透過seek方法訪問任意Offset的訊息,從而可能丟失部分訊息

Consumer可能並不需要消費某一事務內的所有Partition,因此它將永遠不會讀取組成該事務的所有訊息

四、事務中Offset的提交

許多基於Kafka的應用,尤其是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取訊息,後者負責將處理完的資料寫回Kafka的其它Topic中。

為了實現該場景下的事務的原子性,Kafka需要保證對Consumer Offset的Commit與Producer對傳送訊息的Commit包含在同一個事務中。否則,如果在二者Commit中間發生異常,根據二者Commit的順序可能會造成資料丟失和資料重複:

如果先Commit Producer傳送資料的事務再Commit Consumer的Offset,即At Least Once語義,可能造成資料重複。

如果先Commit Consumer的Offset,再Commit Producer資料傳送事務,即At Most Once語義,可能造成資料丟失。

五、分散式事務經見實現機制

5.1 兩階段提交

Kafka的事務機制與《分散式事務(一)兩階段提交及JTA》一文中所介紹的兩階段提交機制看似相似,都分PREPARE階段和最終COMMIT階段,但又有很大不同。

Kafka事務機制中,PREPARE時即要指明是PREPARE_COMMIT還是PREPARE_ABORT並且只須在Transaction Log中標記即可,無須其它元件參與。而兩階段提交的PREPARE需要傳送給所有的分散式事務參與方,並且事務參與方需要儘可能準備好,並根據準備情況返回Prepared或Non-Prepared狀態給事務管理器。

Kafka事務中,一但發起PREPARE_COMMIT或PREPARE_ABORT則確定該事務最終的結果應該是被COMMIT或ABORT。而分散式事務中,PREPARE後由各事務參與方返回狀態,只有所有參與方均返回Prepared狀態才會真正執行COMMIT,否則執行ROLLBACK

Kafka事務機制中,某幾個Partition在COMMIT或ABORT過程中變為不可用,隻影響該Partition不影響其它Partition。兩階段提交中,若唯一收到COMMIT命令參與者Crash,其它事務參與方無法判斷事務狀態從而使得整個事務阻塞

Kafka事務機制引入事務超時機制,有效避免了掛起的事務影響其它事務的問題

Kafka事務機制中存在多個Transaction Coordinator例項,而分散式事務中只有一個事務管理器

兩階段提交原理

二階段提交的演算法思路可以概括為:協調者詢問參與者是否準備好了提交,並根據所有參與者的反饋情況決定向所有參與者傳送commit或者rollback指令(協調者向所有參與者傳送相同的指令)。

所謂的兩個階段是指

準備階段 又稱投票階段。在這一階段,協調者詢問所有參與者是否準備好提交,參與者如果已經準備好提交則回覆Prepared,否則回覆Non-Prepared。

提交階段又稱執行階段。協調者如果在上一階段收到所有參與者回覆的Prepared,則在此階段向所有參與者傳送commit指令,所有參與者立即執行commit操作;否則協調者向所有參與者傳送rollback指令,參與者立即執行rollback操作。

5.2 Zookeeper

Zookeeper的原子廣播協議與兩階段提交以及Kafka事務機制有相似之處,但又有各自的特點

Kafka事務可COMMIT也可ABORT。而Zookeeper原子廣播協議只有COMMIT沒有ABORT。當然,Zookeeper不COMMIT某訊息也即等效於ABORT該訊息的更新。

Kafka存在多個Transaction Coordinator例項,擴充套件性較好。而Zookeeper寫操作只能在Leader節點進行,所以其寫效能遠低於讀效能。

Kafka事務是COMMIT還是ABORT完全取決於Producer即客戶端。而Zookeeper原子廣播協議中某條訊息是否被COMMIT取決於是否有一大半FOLLOWER ACK該訊息。

大資料框架exactly-once底層原理,看這篇文章就夠了