導讀:
大家好,我是來自阿里雲資料庫的李少鋒,現在主要專注於 ADB Hudi & Spark 的研發以及產品化,今天非常高興能夠借這個機會和大家分享下阿里雲 ADB 基於 Apache Hudi 構建 Lakehouse 的應用與實踐。
接下來我將分為 3 個部分給大家介紹今天的議題,首先我會介紹經過將近一年打磨推出的ADB 湖倉版的架構以及關鍵優勢,接著會介紹在支援客戶構建 Lakehouse 時,我們是如何克服基於 Hudi 構建千億資料入湖的挑戰;最後將介紹基於 ADB 構建 Lakehouse 的實踐。
分享嘉賓|李少鋒 阿里雲 資料庫技術專家
出品社群|DataFun
01
ADB 湖倉版機構與關鍵優勢
首先先來介紹下 ADB 湖倉版架構及其關鍵優勢
一體版本,我們稱為 ADB 湖倉版。湖倉版在資料全鏈路的「採存算管用」5 大方面都進行了全面升級。
在「採集」方面
,我們推出了資料管道 APS 功能,可以一鍵低成本接入資料庫、日誌、大資料中的資料。
在「儲存」方面
,我們除了新增 Hudi 格式的外表能力,也對內部儲存進行了升級改造。透過只存一份資料,同時滿足離線、線上 2 類場景。
在「計算」方面
,我們對自研的 XIHE BSP SQL 引擎進行容錯性、運維能力等方面的提升,同時引入開源 Spark 引擎滿足更復雜的離線處理場景和機器學習場景。
在「管理」方面
,我們推出了統一的元資料管理服務,統一湖倉元資料及許可權,讓湖倉資料的流通更順暢
在「應用」方面
,除了透過SQL方式的BI分析應用外,還支援基於 Spark 的 AI 應用。
我們希望透過資源一體化和體驗一體化,2 個一體化能力,幫助客戶實現降本增效。資源一體化是指一份資料、極致彈性和融合引擎。體驗一體化是指統一的計費單位、資料管道、資料管理和資料訪問。
在 ADB 湖倉版中,首先將一份全量資料存在低成本高吞吐儲存介質上,低成本離線處理場景直接讀寫低成本儲存介質,降低資料儲存和資料 IO 成本,保證高吞吐。其次將實時資料存在單獨的儲存 IO 節點(EIU)上,保證「行級」的資料實時性,同時對全量資料構建索引,並透過 Cache 能力對資料進行加速,滿足百 ms 級高效能線上分析場景。因此,資料湖中的資料可以在數倉中加速訪問;而數倉中的資料,也可以利用離線資源進行計算。
極致彈性透過彈得好、彈得起、彈得快三個特點,貼合業務負載,保證查詢效能,降低資源成本。
彈得好
是指提供了適合線上分析業務的分時彈性和適合離線處理、機器學習的按需彈性,完美貼合業務負載,滿足業務需求。
彈得起
是指基於神龍 + ECS + ECI構建了三級資源庫存供給能力,保障彈性資源交付率超過 95%。
彈得快
是指資源池化後透過快取加速等技術,彈性啟動效率可以達到 10s 內。
支撐離線、線上兩類場景的背後,除了剛才提到的一份資料。還有自研的XIHE融合計算引擎,同時提供 MPP 模式和 BSP 模式,並提供自動切換能力。
自動切換能力是指當查詢使用 MPP 模式無法在一定耗時內完成時,系統會自動切換為 BSP模式進行執行。未來,我們還將在一個查詢中,根據運算元特點部分運算元採用 MPP 模式,部分運算元採用 BSP 模式,兼顧高效能和高吞吐,提供更智慧的查詢體驗。
同時融合引擎為提供資源利用率提供了可能,通常離線處理發生在晚上,線上分析發生在白天,一份資源可以同時支援 2 類場景,透過錯峰提升資源利用率。
最後再介紹一下統一資料管道APS。資料快速接入,是資料分析的第一步,也是最容易流失客戶的一步。但我們發現數據接入面臨體驗差、成本高、門檻高等痛點。
所以,我們決定跟其它接入工具做好深度最佳化的同時,面向中小客戶,構建一個統一資料管道 APS,底層基於 Flink 實時引擎,提供易用性好、低延時、低成本的資料接入體驗。
對於湖倉中的表元資料,ADB 做了統一元資料服務,湖倉中的元資料/許可權可互通,可透過不同引擎自由訪問湖倉資料;而對於湖倉資料,為了遮蔽底層資料儲存格式的差異,便於第三方引擎整合,ADB 提供了面向記憶體列存格式 Arrow,滿足對吞吐的要求,對於內部儲存,已經透過 Arrow 格式完成 Spark 對接,提供高吞吐能力。
自研是打造技術深度的基礎,但同時我們積極擁抱開源,滿足已經生長在開源生態上的客戶可以更平滑地使用湖倉版。外表型別,在 Parquet/ORC/JSON/CSV 等 Append 型別資料格式的基礎上,為支援在物件儲存上低成本 Lakehouse,支援了Apache Hudi,提供近實時的更新、刪除能力,滿足客戶對低成本的訴求。
——
02
基於 Hudi 構建千億資料入湖的挑戰
以上就是 ADB 湖倉版的架構與關鍵優勢,接著介紹基於 Hudi 構建千億資料入湖的挑戰以及如何我們是如何克服這些挑戰的。
首先我們看下典型的業務場景,業務源端資料透過資料採集進入阿里雲 SLS,然後透過 APS資料管道服務進入ADB 湖倉版,基於 ADB 湖倉版之上構建日誌查詢、日誌匯出、日誌分析等功能。
該業務場景有如下典型的特點:
1。高吞吐,單鏈路最高 4GB/s 吞吐,日增資料量 350TB,總儲存量超 20PB;
2。資料傾斜/熱點嚴重:源端資料傾斜非常嚴重,從百萬到幾十條資料不等;
3。掃描量大:日誌查詢的掃描量 50GB ~ 500GB 不等,查詢併發在 100+;
如果使用數倉的話會面臨成本高的問題,另外是數倉缺少熱點打散能力,只能不斷加資源,瓶頸明顯;最後是數倉系統資源是固化的,沒有動態彈效能力,或者彈效能力較弱,承載不同客戶查詢需求時,容易互相干擾,尤其是大客戶的資料掃描。
先來看下
日誌資料入湖的技術架構
,資料從 SLS 讀取,然後透過 Flink 寫入 Hudi,整個架構非常簡單,其中對於 SLS 和 Flink 的狀態儲存說明如下:
1。SLS 多 Shard 儲存,由 Flink 的多個 Source 運算元並行消費
2。消費後透過 Sink 運算元呼叫 Hudi Worker/Writer 寫出到 Hudi(實際鏈路還存在 Repartition,熱點打散等邏輯)
3。Flink Checkpoint 後端儲存以及 Hudi 資料儲存在OSS
接著來看下
Flink 寫入 Hudi 的主流程
,首先明確 Flink 寫 Hudi 時會存在兩種角色,Coordinator 負責處理元資料,如對 Hudi Instant 的相關操作,Worker/Writer 負責處理資料,如寫入資料為 parquet 格式,寫入步驟如下
1。Coordinator 開啟一個 Hudi Instant
2。Filnk Sink 傳送資料給 Hudi Worker 寫出
3。觸發 Flink Checkpoint 時,則透過 Sink 運算元通知 Worker Flush 資料,同時持久化operator-state
4。當 Flink 完成 Checkpoint 持久化時,通知 Coordinator 提交該 Instant,Coordinator 完成最終提交,寫 commit 檔案,資料對外可見
5。提交後會立即開啟一個新的 Instant,繼續上述迴圈
如果把 Flink 寫 Hudi 保證端到端一致性分成兩部分,一部分是 Flink 框架內部的,另外一個部分是與 Hudi 互動的部分。
1。其中步驟 1 到 3 之間是 Flink Checkpoint 邏輯,如果異常在這些步驟上發生,則認為 Checkpoint失敗,觸發 Job 重啟,從上一次 Checkpoint 恢復,相當於兩階段提交的 Precommit 階段失敗,事務回滾,如果有 Hudi 的 inflight commit,等待 Hudi Rollback 即可,無資料不一致問題
2。當 3 到 4 之間發生異常,則出現 Flink 和 Hudi 狀態不一致。此時 Flink 認為 Checkpoint 已結束,而 Hudi 實際尚未提交。如果對此情況不做處理,則發生了資料丟失,因為Flink Checkpoint 完畢後,SLS 位點已經前移,而這部分資料在 Hudi 上並未完成提交,因此容錯的重點是如何處理此階段引起的資料一致性問題
3。我們拿一個例子舉例分析在步驟 3 和 4 之前發生異常時,如果保證資料一致性
4。否則認為上一次 Instant 執行失敗,等待 Rollback 即可,髒資料對使用者不可見
我們舉例分析下在步驟 3 和 4 之間發生異常時,是如何保證資料一致性的,可以看到對於1615 的 commit 在 Flink 完成 Checkpoint 時會將其 instant 資訊持久化至 Flink 後端儲存。
從 checkpoint 恢復時有如下步驟:
1。Checkpoint 時 Sink 運算元 Flush 資料及持久化 Instant 的 state;
2。Worker 請求處於 pending 的 Instant,與從 state 恢復的 Instant 做對比並彙報給 Coordinator;
3。Coordinator 從 Instant Timeline 中獲取最新的 Instant 資訊,並接收所有 Worker 的彙報;
4。如果 Worker 彙報 Instant 相同,並且不在 Timeline 中已完成的 Instant 中,則表示該 Instant 尚未提交,觸發 Recommit。
經過上述步驟可以保證在 Flink 完成 Checkpoint 時,但對於 Hudi Commit 失敗時的場景會進行 recommit,從而保證資料的一致性。
接著介紹
我們在處理 4GB/s 的高吞吐時面臨的挑戰
,一個非常大的挑戰就是熱點資料處理,我們統計了 5 分鐘內各 Task 處理資料的大小,發現各 Task 處理資料從 200W 條到幾十條不等,熱點問題明顯。
而在寫入鏈路中會根據分割槽欄位做 shuffle,同一個分割槽由一個 Task 寫入,對於上述存在的熱點問題會導致部分TM上的分割槽寫入非常慢,導致資料延遲/作業掛掉。
面對寫入熱點問題,我們開發了熱點打散功能,透過配置指定規則打散熱點資料,同時可以根據資料流量自動更新熱點打散規則,確保系統的健壯性,可以看到經過熱點打散後個 TM 處理的資料量/CPU佔用/記憶體佔用基本相同並且比較平穩,作業穩定性也得到了提升。
另外一個挑戰是 OOM,其實和熱點打散也有很大關係,我們發現作業執行時會出現OOM,導致作業掛掉,資料延遲上漲,因此
我們對堆外/堆內記憶體的使用做了比較細緻的梳理,
使用記憶體的部分主要集中在:
1。寫 Parquet 檔案佔用堆外記憶體
2。OSS Flush 佔用堆外記憶體
3。單 TM 的 Slot 數、寫併發都影響記憶體佔用,如每個寫併發處理 30-50 Handle,TM 16 併發,8M row group size 最多佔用 6400 M 記憶體
4。堆內記憶體負載過高導致頻繁Full GC
我們針對上述
分記憶體使用做了最佳化
,如:
1。row group size 配置為 4M,減少堆外記憶體佔用,同時將堆外記憶體調大
2。close 時及時釋放 compressor 佔用的記憶體,這部分對 parquet 原始碼做了改造
3。透出堆外記憶體指標,增加堆外記憶體監控,這部分也是對 parquet 原始碼做了改造
4。源端 source 運算元與 Shard 分配更均衡,以此保證各 TM 消費的 shard 數基本均等
最後一個比較大的挑戰就是 OSS 限流,雲物件儲存(如OSS)對 List 操作不友好,list objects 對 OSS 伺服器壓力較大,如在我們場景下,1500 寫併發,會產生 1W QPS list object,OSS 側目前限流 1K QPS,限流明顯,限流會導致作業處理變慢,延遲變高,為解決該問題,
我們也梳理了寫入鏈路對 OSS 的請求,在元資料鏈路對 OSS 的請求如下:
1。Timeline 構建需要 list 。hoodie 目錄
2。Flink CkpMetaData 基於 OSS 傳遞給 Worker
3。Hadoop-OSS SDK create/exists/mkdir 函式依賴 getStatus 介面,getStatus 介面現有實現導致大量 list 請求,其中 getStatus 介面對於不存在的檔案,會額外進行一次 list objects 請求確認 Path 是不是目錄,對 Marker File、partitionMetadata、資料檔案都會產生大量的 list objects 請求
在資料鏈路對 OSS 請求如下:先臨時寫到本地磁碟,再上傳至 OSS,導致本地磁碟寫滿。
針對上述對 OSS 的請求,我們做了如下最佳化,在元資料側:
1。Timeline Based CkpMetaData,將TM請求打到 JM,避免大量 TM 掃描 OSS 檔案
2。Hadoop-OSS SDK,透出直接建立檔案的介面,不進行目錄檢查
3。PartitionMetaData 快取處理,在記憶體中對每個分割槽的元資料檔案做了快取處理,儘量減少與 OSS 的互動
4。Create Marker File 非同步處理,非同步化處理不阻塞對 Handle 的建立,減少建立 Handle 的成本
5。開啟 Timeline Based Marker File,這個是社群已經有的能力,直接開啟即可
這裡額外補充下可能有小夥伴比較好奇為什麼開啟 hudi metadata table 來解決雲物件儲存的限流問題,我們內部做過測試,發現開啟 metadata table 時,寫入會越來越慢,無法滿足高吞吐的場景。
以上就是我們在處理日誌資料入湖時面臨的典型挑戰以及我們如何克服這些挑戰,接著講講我們在處理資料入湖時為滿足業務要求做的關鍵特性開發。
首先是支援併發寫,業務側要求鏈路有補資料能力,補資料場景涉及多 Flink Client 寫不同分割槽,實時寫鏈路,補資料鏈路,Table Service 鏈路併發操作表資料/元資料,這要求:
1。表資料不錯亂
2。補資料/TableService 鏈路不影響實時寫鏈路
因此
我們對 Hudi 核心側做了部分修改來支援併發寫場景:
1。CkpMetadata 唯一標識,保證不同作業使用不同 ckp meta
2。ViewStorage 唯一標識,保證不同作業 Timeline Server 隔離
3。Lazy Table Service,保證並行作業不互相 rollback commit,避免資料錯亂
4。Instant 生成重試策略,保證 Timeline Instant 的唯一性,避免資料錯亂
5。獨立 Table Service 處理,使用單獨的作業執行 Table Service,與實時寫鏈路完全隔離
另外一個關鍵特性是出於成本考慮,業務側要求 Hudi 中資料不能無限地儲存,需要按照使用者設定的策略保留指定時間的資料,這要求:
1。Hudi 提供分割槽級別按照資料量,分割槽數和過期時間等不同維度進行生命週期管理的能力
2。Hudi 支援併發設定生命週期管理策略,因為面向多租戶會涉及併發更新管理策略
針對業務對生命週期管理的需求,我們開發 Hudi 的生命週期管理功能,具體實現如下:
1。對於生命週期管理使用,首先透過 call command 新增生命週期管理策略,並進行持久化,為支援併發更新,我們參考 Hudi MOR 表中 Base 檔案和 Log 檔案的設計,併發更新會寫入不同的 Log 檔案;
2。對於生命週期管理的執行,在每一次 commit 結束後進行統計資訊增量採集並更新至統計資訊檔案,然後按照分割槽策略進行過期分割槽的處理,對於過期分割槽會生成一個 replace commit,等待後續被 clean 即可,同時會合並前面的策略 Base 檔案和 Log 檔案,生成新的 Base 檔案以及更新統計資訊;
我們也提供了按照分割槽數、資料量、過期時間三種不同策略來管理 Hudi 表中的分割槽的生命週期,很好的滿足業務側的需求。
最後一個比較大的關鍵特性是獨立 TableService,業務側要求保證實時寫鏈路穩定,同時希望提高入湖資料的查詢效能,這要求:
1。在不影響主鏈路同步效能情況下,清理 Commits/檔案版本,保證表狀態大小可控
2。為提高查詢效能,提供非同步 Clustering 能力,合併小檔案,減少掃描量,提高查詢效能
基於上述訴求我們開發了基於 ADB 湖倉版的獨立 Table Service 服務,在入湖鏈路寫入完成後會進行一次排程,然後將請求寫入排程元件,供排程元件後續拉起彈性的 Flink/Spark TableService 作業,可以做到對實時寫入鏈路無影響。
對於表狀態管理以及資料佈局最佳化均是採用的獨立 TableService 作業執行,保證表的狀態大小可控,同時透過非同步 Clustering 操作,端到端查詢效能提升 40% 以上。
在對日誌入湖鏈路進行了深入打磨後,我們可以保證最高 4GB/s 的資料寫入,延遲在 5min內,滿足業務需求。
同時也建設了指標監控大盤與異常鏈路告警功能,便於快速定位問題以及出現問題後快速響應。
以上介紹便是我們是如何基於Hudi構建千億資料入湖以及如何克服入湖挑戰的。
——
03
基於 ADB 構建 Lakehouse 的實踐
最後介紹下基於 ADB 構建 Lakehouse 的實踐。
前面也提到 ADB 湖倉版擁抱開源技術,ADB 集成了流式處理引擎 Flink,並在此基礎上推出了
APS 資料管道服務,APS 具備如下優勢:
1。低成本,低延遲:作業級別彈性資源,按量付費;按流量自由設定作業資源;充分享受 Flink 流式處理效能紅利
2。多資料來源快速整合:得益於 Flink 成熟的 Connectors 機制,可以方便對接如 SLS、Kafka 等資料來源,同時可以保證資料入湖的精確一致性
3。低使用門檻:支援白屏化操作快速構建 Lakehouse,基於統一元資料服務,Lakehouse 資料可透過 Spark/ADB 引擎無縫訪問
而為了滿足客戶對於批處理以及機器學習能力的訴求,ADB 集成了 Spark 引擎,並在此技術上推出了 Servlersss Spark,其具備如下優勢:
1。一份資料儲存,在離線共享:無縫對接 ADB 已有元資料和資料;支援大吞吐讀寫 ADB 資料;Spark 批次寫入的資料,線上分析查詢可直接訪問
2。資料庫體系&體驗:使用 ADB 統一的賬號、許可權和鑑權體系;支援透過 ADB Workflow、DMS 以及 DataWorks 排程編排 SparkSQL 作業
3。完全相容 Spark 生態:基於最新的 Apache Spark 3。X 版本,充分享受開源社群紅利;支援 SparkSQL、DataFrame API 主流程式設計介面以及 Thrift Server;支援 Spark UDF,支援 Hive UDF/UDTF/UDAF
4。按量計費,秒級彈性:開箱即用,按量計費無任何持有成本;基於神龍、ECS/ECI 的管控底座以及資源池化,快取加速等技術,支援 Spark Pod 秒級拉起
對於實時性有訴求的場景,可以基於 ADB APS 服務可以非常方便的構建準實時 Lakehouse,白屏化操作快速配置入湖通道,多種資料來源支援,滿足不同資料來源接入訴求,更多資料來源也在持續整合中。
而對於實時性沒有訴求的場景,可以基於 Spark + Hudi + ADB 工作編排構建離線 Lakehouse,如想對 RDS 資料構建離線Lakehouse進行分析,可使用ADB工作編排,利用 Spark 將 RDS 資料離線匯入 Lakehouse,並做資料的清洗和加工,有需要最後可透過一條簡單的 Spark SQL將資料從 Hudi 匯入 ADB 做查詢分析加速。
另外 ADB Spark 與 Hudi 和 ADB 表都做了深度整合,便於客戶使用,如對於 Hudi 表的使用,免去了很多 Hudi 額外的配置,開箱即用;對於 ADB 表,可透過 Spark 建立、刪除 ADB 表元資料,也支援讀寫 ADB 表資料。
另外最後介紹下 Spark 與 ADB 整合提供的 Zero-ETL 解決方案,這也與 2022 AWS reinvent 推出的資料整合服務 Zero-ETL 類似,我們透過一個場景瞭解 Zero-ETL 的應用及其優勢。
客戶如果對於 ADB 表有分析挖掘的需求,受限於 JDBC 方式吞吐的限制,需要先將 ADB內表資料以 parquet 格式匯出到 OSS,利用 OSS 的頻寬,再透過 Spark 進行分析挖掘,最後輸出挖掘結果,可以看到這種方案中存在ETL操作,從而引入資料一致性差、時效性低的問題。
在 ADB 湖倉版中 Spark 與 ADB 做了深度整合,透過 Lakehouse API直接訪問 ADB 內表資料,吞吐高,所以面對同樣的場景,可以使用下面的鏈路,即直接透過 Spark 分析 ADB 資料,無需 ETL,資料一致性好,時效性也很高;另外對於 Lakehouse API 層的訪問也支援列投影、謂詞下推、分割槽裁剪等能力,更多下推能力也在持續建設中。
前面介紹了很多關於 ADB 湖倉版的功能以及優勢,包括 Serverless Spark、APS 服務、融合引擎、工作流編排等等。而對於 ADB 湖倉版的定位總結成一句話就是從湖到倉,打造雲原生一站式資料分析平臺。
讓客戶透過 ADB 湖倉版平臺就可以輕鬆玩轉資料分析。
另外 AnalyticDB MySQL 湖倉版從 7 月份開始,歷時 4 個月的邀測後,11 月 1 日正式開始公測,可以透過點選「AnalyticDB MySQL3。0婀栦粨鐗堝叕嫻嬬敵璇�」進行公測申請。
今天的分享就到這裡,謝謝大家。
|分享嘉賓|
李少鋒
阿里雲 資料庫技術專家
Apache Hudi Committer & PMC成員,資料庫OLAP ADB Hudi&Spark團隊技術負責人。
|DataFun新媒體矩陣|
|關於DataFun|
專注於大資料、人工智慧技術應用的分享與交流。發起於2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公眾號 DataFunTalk 累計生產原創文章800+,百萬+閱讀,15萬+精準粉絲。