基於Spark的大規模推薦系統特徵工程

導讀:

特徵工程在推薦系統中有著舉足輕重的作用,大規模特徵工程處理的效率極大的影響了推薦系統線上的效能。第四正規化作為國際領先的機器學習和人工智慧技術與平臺服務提供商,面向大規模特徵工程問題開發了下一代離線線上一致性特徵抽取引擎FESQL,針對AI場景支援SQL介面,相容Spark 3。0同時提供高效能的Native執行引擎。本次分享題目為基於Spark的大規模推薦系統特徵工程及最佳化,主要內容包括:

大規模推薦系統

Spark SQL應用與FESQL

基於LLVM的Spark最佳化

總結

01

大規模推薦系統

1. 業界推薦系統的應用

基於Spark的大規模推薦系統特徵工程

眾所周知,推薦系統在業界有著許多成功的應用,據統計,亞馬遜40%的銷售在推薦系統的作用下產生;Netflix 75%的使用者使用推薦系統尋找他們喜愛的影片;30%的使用者進行線上購物前會使用關鍵詞搜尋他們需要的商品。目前,幾乎所有的新聞、搜尋、廣告、短影片應用都是基於推薦系統建立的。

2. 推薦系統的架構

基於Spark的大規模推薦系統特徵工程

業界成熟的推薦系統架構一般分為三層:離線層 ( offline layer ),近實時的流式層 ( stream layer ) 和線上層 ( online layer ) 三部分。

離線層:

一般用於大規模的資料預處理、特徵抽取與模型訓練,通常用Hadoop HDFS進行資料儲存,使用Spark,MapReduce等分散式計算引擎進行特徵抽取與計算以及資料管理,再使用離線模型訓練框架TensorFlow、Pytorch、MXNet等進行離線的模型訓練,模型結果可用於線上預測。

近實時的流式層:

主要是為了提升推薦系統的時效性,對於一些時序特徵,可以使用訊息佇列收集近實時的資料,結合流式計算服務如Flink對資料進行補全,把結果存入NoSQL、MySQL等儲存服務中,儲存結果供線上服務使用。

線上層:

使用者產生的資料可以透過Flink生成流式特徵,也可以使用HDFS進行資料歸檔。線上預估時從NoSQL或MySQL中提取流式特徵,透過離線訓練的模型即可進行線上預估。

3. 大規模推薦系統的特徵抽取

基於Spark的大規模推薦系統特徵工程

大規模推薦系統的資料處理通常分為兩類:

ETL ( Extract, Transform, Load ):進行資料資料補全、格式轉換等;

特徵抽取:對原始資料特徵進行處理,得到模型易於學習的樣本特徵,如離散化,embedding化等方法。

常用工具包括:

SQL/Python:針對一般規模的資料,通常可以透過使用SQL/Python進行處理;

Hadoop/Spark/Flink:針對大規模資料,通常要藉助Hadoop/Spark/Flink等計算框架。

02

Spark SQL應用與第四正規化自研FESQL技術

1. Spark簡介

基於Spark的大規模推薦系統特徵工程

Spark 是專為大規模資料處理而設計的快速通用的計算引擎,依託強大的分散式計算能力,在Spark上可以開發機器學習、流式學習等應用。Spark提供了SparkSQL,使其能與SQL、Hive相容,提供PySpark介面可以讓開發者使用Python進行分散式應用開發,提供了MLlib包,可以用於機器學習應用的開發。同時Spark也提供諸如Catalyst/Tungsten等方式的最佳化。

基於Spark的大規模推薦系統特徵工程

Spark的優勢就在於:計算速度快,能夠處理PB級別的資料,分散式計算和自動容錯機制,提供便於使用的SQL/Python/R API,同時,Spark提供的機器學習庫也可以應用於推薦系統,所以在業界,幾乎所有公司都會使用Spark作為離線層資料處理框架。

2. 大規模推薦系統中的Spark應用

基於Spark的大規模推薦系統特徵工程

以IBM的一個推薦系統開源專案來說明Spark在推薦系統中的應用。首先是資料載入,使用read。csv即可載入本地或HDFS資料。使用select即可進行特徵列選擇。

基於Spark的大規模推薦系統特徵工程

然後是對資料進行預處理以及簡單的特徵抽取,該專案中使用了Spark UDF對字串進行處理,抽取出其中的年份資訊,將年份資訊作為特徵進行使用。

基於Spark的大規模推薦系統特徵工程

得到全部特徵預處理的結果後即可進行模型訓練,可以使用Spark內建機器學習API進行模型訓練。訓練完成後,模型即可上線進行線上預估。

基於Spark的大規模推薦系統特徵工程

線上的預估服務需要提供實時計算的預估介面,但是在實踐中,Spark並不適合直接用於線上預估。原因有三:

Driver-exexutor結構只適合進行批次處理,不適合線上處理

Spark的批處理模式不適合提供長時間執行的線上服務,也不能保證低延時的計算效率(Spark 3。0的Hydrogen可以部分支援)

RDD介面只適合迭代計算,不適合做實時計算

因此,業界的通常做法是使用Java、C++等後端語言實現線上的預估服務,這就帶來了另一個線上特徵抽取的一致性問題,由於必須要保證線上線下特徵的一致性,所以必須同時開發線上使用的特徵處理模組,並人工保證計算結果沒有差異。

3. Spark的優缺點

基於Spark的大規模推薦系統特徵工程

Spark支援大規模資料的批處理,提供標準的SQL介面的優點使其成為離線層資料處理的不二之選,但是,Spark不支援線上服務,不能保證線上線下特徵一致性,同時在AI場景下的效能沒有經過最佳化,所以在AI場景下,Spark仍有許多不足。針對這些不足,第四正規化開發了FESQL執行引擎。

4. FESQL線上線下一致性執行引擎

基於Spark的大規模推薦系統特徵工程

FESQL——保證離線線上特徵一致性的SQL執行引擎。上圖表示傳統的上線過程,生成離線模型檔案後,由應用開發者開發線上預估服務,將Spark、SQL中的特徵處理邏輯翻譯成後端語言程式碼,實現線上服務,每新增一個特徵,都要開發對應的特徵抽取模組,同時需要使用者和業務開發者保證特徵資料的一致性。下圖是使用FESQL的上線過程,由於線上線下使用統一的SQL服務進行特徵抽取,因而保證了特徵在線上和線下的一致性。

基於Spark的大規模推薦系統特徵工程

圖中所示為FESQL基本框架,左邊離線部分和SparkSQL的用法基本一致,由資料科學家設計SQL語句,基於Spark進行離線批處理。橙色框表示第四正規化開發的基於LLVM最佳化的SQL引擎,效能大大優於原生Spark,同時能夠更好的支援線上服務,尤其對於SQL語句進行了拓展,使之能夠更好的支援機器學習場景下的線上特徵處理。其中FEDB是有第四正規化開發的全記憶體資料庫,相比於Spark讀取HDFS這種高延時的資料載入方式,FEDB可以提前載入模型預估所需資料,效果接近開發的線上特徵抽取模組,同時支援時序特徵。線上線下的資料一致性由同一套的SQL執行引擎保證。

5. 效能對比

基於Spark的大規模推薦系統特徵工程

與相容SQL的全記憶體資料庫memsql的方式進行效能對比可以發現,LLVM最佳化後的SQL之心引擎在讀和寫的效能上都要更高。

基於Spark的大規模推薦系統特徵工程

對於機器學習場景下的列聚合 ( 生成時序特徵 ) 場景,LLVM最佳化後的SQL引擎也比memsql快很多,耗時基本小於memsql的50%。

03

基於LLVM的Spark最佳化

1. Spark Catalyst和Tungsten最佳化

基於Spark的大規模推薦系統特徵工程

Spark2。0之後提供了Catalyst和Tungsten最佳化。圖為Catalyst從SQL解析到生成物理計劃的流程圖,由SQL語句或DataFrame介面透過編譯器技術 ( 語法解析等 ) 生成Unresolved Logical Plan,Catalyst透過解析Catalog對Unresolved Logical Plan處理得到Logical Plan,在經過SQL常用最佳化方案,得到Optimized Logical Plan,最佳化之Catalyst後可以生成多個基於Spark執行的Physical Plan,最終選擇其中最高效的進行執行。該方式適合於計算節點最佳化,對於SQL的最佳化也同樣效果顯著。

Tungsten是另外一種最佳化方案。主要的最佳化點在於:

記憶體管理與堆外儲存避免了多餘的記憶體使用,同時減少了GC;

引入code generation技術,透過JIT編譯執行,Spark動態生成Java位元組碼來計算這些表示式,而不是為逐行解析執行,減少了原始資料型別的裝箱操作,更重要的是避免了Overhead較大的虛擬函式呼叫。

基於Spark的大規模推薦系統特徵工程

以一個經典例項來介紹Tungsten的原理。左側的SQL命令可以翻譯成在Spark上執行的Logical Plan,由下往上分為4個計算節點,傳統的SQL執行引擎中,四個節點分別由四個迭代器實現 ( 可以理解為四個迴圈 ),迴圈沒有合併最佳化以及節點的虛擬函式呼叫對於CPU Cache非常不最佳化,導致傳統的SQL引擎計算效能比較差。右側為Tungsten最佳化後的結果,使用了whole staged code generation,對多節點的迴圈進行了合併,效能有著明顯的提升。

2. Catalyst/Tungsten的不足

基於Spark的大規模推薦系統特徵工程

Catalyst/Tungsten給Spark帶來了明顯的效能能提升,但Catalyst/Tungsten的最佳化仍然是基於Java進行的,如果能使用更底層的指令集,如彙編、二進位制碼效果會更好;JVM難以支援迴圈展開等最佳化方式;而且並非所有的節點都支援code generation,例如圖中的WindowExec節點就不支援code generation。

3. FESQL

基於Spark的大規模推薦系統特徵工程

鑑於以原因,Catalyst/Tungsten的最佳化仍有不足,第四正規化基於LLVM技術進一步最佳化得到FESQL。SparkSQL架構如黃色部分所示,FESQL架構如藍色框所示,根據SparkSQL語句生成FESQL Logical Plan,再由LLVM JIT生成平臺二進位制碼直接執行,相比於Spark少了JVM一層,效能也會有明顯提升。

4. LLVM簡介

基於Spark的大規模推薦系統特徵工程

LLVM專案是一個模組化的、可重用的編譯器和工具鏈集合,可以方便的實現編譯器和程式碼生成的工作。提供了許多有用的工具,如Clang、LLDB、MLIR、TVM等,能夠實現多種程式語言的編譯器。

基於Spark的大規模推薦系統特徵工程

JIT ( Just-In-Time Compiler ) 編譯,可以一邊執行程式一邊編譯二進位制程式碼,右圖為使用JIT編譯的Add函式,這部分程式碼可以在執行時被翻譯成底層程式碼,與直接使用C++來實現效率接近,同時JIT能夠適應不同的CPU生成最佳化的二進位制碼。

5. FESQL的最佳化點

基於Spark的大規模推薦系統特徵工程

目前已經能使用迴圈展開、常數摺疊、向量化和一些基於CPU本身的最佳化;未來,基於PTX後端還可以嘗試生成CUDA程式碼,利用GPU進行計算的加速。

6. 效能比較

基於Spark的大規模推薦系統特徵工程

FESQL與Databrick內部的Photon非常相似 ( Photon內部由C++實現 ),因而進行對兩者進行比較。Photon是Databrick的企業產品,僅能在Databrick的平臺上使用,且不支援PTX/CUDA。對比由C++和由JVM實現的處理引擎的效能,發現C++實現的處理引擎效能非常優越。

7. FESQL的節點最佳化

基於Spark的大規模推薦系統特徵工程

FESQL使用了節點最佳化,使用SimpleProject對Project節點進行合併最佳化,對視窗節點使用code generate進行最佳化。下圖說明了對於節點的最佳化可以明顯減少執行的流程。

8. FESQL的表示式最佳化

基於Spark的大規模推薦系統特徵工程

FESQL也實現了非常多表達式最佳化,保證在不同SQL場景都比傳統資料庫有著更好的效能表現。

9. 效能

基於Spark的大規模推薦系統特徵工程

對比Spark 3。0和FESQL on Spark可以發現,FESQL的執行效率明顯高於Spark 3。0,多視窗的情況下效果更明顯,有著接近6倍的效能提升。

基於Spark的大規模推薦系統特徵工程

透過對比兩者生成的邏輯計劃圖,可以發現FESQL的計劃圖明顯更簡單,透過對比兩者的火焰圖,底層RDD計算基本一致,FESQL取樣的樣本數更少,執行時間更短,因此FESQL的執行效率更高。

10. 展望

基於Spark的大規模推薦系統特徵工程

未來第四正規化計劃推出LLVM-enabled Spark Distribution,使開發者可以透過設定SPARK_HOME便利的實現效能加速;為開發者提供Docker、Notebook、Jar、Whl包,便於開發;提供類似Python的保證一致性的DSL語言用於UDF和UDFA實現;還有提供對CUDA和GPU的支援。

04

總結

基於Spark的大規模推薦系統特徵工程

大規模推薦系統中可以使用Spark、Flink、ES、FESQL實現大規模的資料處理,其中Spark更適合離線的批處理,而不適合線上處理,FESQL能同時進行線上線下服務因為能夠保證特徵一致性,同時LLVM JIT實現的FESQL擁有比Spark 3。0更好的效能。

更多SQL原生計算引擎以及Spark效能最佳化的技術,歡迎關注我們後續的分享。

今天的分享就到這裡,謝謝大家。

在文末分享、點贊、在看,給個三連擊唄~~

嘉賓介紹:

基於Spark的大規模推薦系統特徵工程

陳迪豪

第四正規化 | 架構師

第四正規化先知平臺架構師,負責深度學習框架產品化以及下一代特徵引擎開發工作。積極參與了開源社群TensorFlow、Kubernetes、TVM等專案開發,對分散式系統和深度學習平臺有一定了解,目前專注於離線線上一致性的特徵引擎開發。

分享嘉賓:陳迪豪 第四正規化 架構師

編輯整理:劉璐

出品平臺:第四正規化天樞、DataFunTalk