Spark分散式機器學習原始碼分析:Kmeans族聚類

​ Spark是一個極為優秀的大資料框架,在大資料批處理上基本無人能敵,流處理上也有一席之地,機器學習則是當前正火熱AI人工智慧的驅動引擎,在大資料場景下如何發揮AI技術成為優秀的大資料探勘工程師必備技能。

本文采用的元件版本為:

Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5

,老規矩先開啟一系列Hadoop、Spark服務與Spark-shell視窗:

Spark分散式機器學習原始碼分析:Kmeans族聚類

1.Kmeans聚類

kmeans演算法又名k均值演算法。其演算法思想大致為:先從樣本集中隨機選取 kk 個樣本作為

簇中心

,並計算所有樣本與這k個“簇中心”的距離,對於每一個樣本,將其劃分到與其

距離最近

的“簇中心”所在的簇中,對於新的簇計算各個簇的新的“簇中心”。

根據以上描述,我們大致可以猜測到實現kmeans演算法的主要三點:

(1)簇個數k的選擇

(2)各個樣本點到“簇中心”的

距離

(3)根據新劃分的簇,更新“簇中心”

K-Means的演算法流程如下:

從資料集中隨機選取 K 個點作為初始聚類的中心,

針對資料集中每個樣本 ,計算它們到各個聚類中心點的距離,到哪個聚類中心點的距離最小,就將其劃分到對應聚類中心的類中

針對每個類別 ,重新計算該類別的聚類中心 (其中 ||表示的是該類別資料的總個數)

重複第二步和第三步,直到聚類中心的位置不再發生變化(我們也可以設定迭代次數)

k-means演算法雖然簡單快速,但是存在下面的缺點:

聚類中心的個數K需要事先給定,但在實際中K值的選定是非常困難的,很多時候我們並不知道給定的資料集應該分成多少個類別才最合適。

k-means演算法需要隨機地確定初始聚類中心,不同的初始聚類中心可能導致完全不同的聚類結果。

第一個缺陷我們很難在k-means演算法以及其改進演算法中解決,但是我們可以透過k-means++演算法來解決第二個缺陷。

Spark分散式機器學習原始碼分析:Kmeans族聚類

2.Kmeans++

由於 K-means 演算法的分類結果會受到初始點的選取而有所區別,因此有提出這種演算法的改進: K-means++ 。其實這個演算法也只是對初始點的選擇有改進而已,其他步驟都一樣。初始質心選取的基本思路就是,初始的聚類中心之間的相互距離要儘可能的遠。

演算法描述如下:

步驟一:隨機選取一個樣本作為第一個聚類中心 c1;

步驟二:

計算每個樣本與當前已有類聚中心最短距離(即與最近一個聚類中心的距離),用 D(x)表示;

這個值越大,表示被選取作為聚類中心的機率較大;

最後,用輪盤法選出下一個聚類中心;

步驟三:重複步驟二,知道選出 k 個聚類中心。

選出初始點後,就繼續使用標準的 k-means 演算法了。K-means++ 能顯著的改善分類結果的最終誤差。

儘管計算初始點時花費了額外的時間,但是在迭代過程中,k-means本身能快速收斂,因此演算法實際上降低了計算時間。

雖然k-means++演算法可以確定地初始化聚類中心,但是從可擴充套件性來看,它存在一個缺點,那就是它內在的有序性特性:下一個中心點的選擇依賴於已經選擇的中心點。針對這種缺陷,k-means||演算法提供瞭解決方法。

3.Kmeans||

k-means++ 最主要的缺點在於其內在的順序執行特性,得到 k 個聚類中心必須遍歷資料集 k 次,並且當前聚類中心的計算依賴於前面得到的所有聚類中心,這使得演算法無法並行擴充套件,極大地限制了演算法在大規模資料集上的應用。

k-means|| 主要思路在於改變每次遍歷時的取樣策略,並非按照 k-means++ 那樣每次遍歷只取樣一個樣本,而是每次遍歷取樣 O(k) 個樣本,重複該取樣過程大約 O(logn) 次,重複取樣過後共得到 O(klogn) 個樣本點組成的集合,該集合以常數因子近似於最優解,然後再聚類這O(klogn) 個點成 k 個點,最後將這 k 個點作為初始聚類中心送入Lloyd迭代中,實際實驗證明 O(logn) 次重複取樣是不需要的,一般5次重複取樣就可以得到一個較好的聚類初始中心。

Spark分散式機器學習原始碼分析:Kmeans族聚類

4.Spark實踐

在下面的示例中,在載入和解析資料之後,我們使用 KMeans物件將資料聚類為兩個聚類。所需聚類的數量傳遞給演算法。然後,我們計算平方誤差的集合和內(WSSSE)。您可以透過增加k來減少此錯誤度量。實際上,最佳k通常是WSSSE圖中存在“肘”的那個。

import org。apache。spark。mllib。clustering。{KMeans, KMeansModel}import org。apache。spark。mllib。linalg。Vectors// 載入和解析資料val data = sc。textFile(“data/mllib/kmeans_data。txt”)val parsedData = data。map(s => Vectors。dense(s。split(‘ ’)。map(_。toDouble)))。cache()// Cluster the data into two classes using KMeans 使用Kmeans將資料聚為兩類val numClusters = 2val numIterations = 20val clusters = KMeans。train(parsedData, numClusters, numIterations)// 計算WSSSE評估聚類結果val WSSSE = clusters。computeCost(parsedData)println(s“Within Set Sum of Squared Errors = $WSSSE”)// 儲存和載入模型clusters。save(sc, “target/org/apache/spark/KMeansExample/KMeansModel”)val sameModel = KMeansModel。load(sc, “target/org/apache/spark/KMeansExample/KMeansModel”)

Spark分散式機器學習原始碼分析:Kmeans族聚類

5.原始碼分析

在spark中,org。apache。spark。mllib。clustering。KMeans檔案實現了k-means演算法以及k-means||演算法,LocalKMeans檔案實現了k-means++演算法。在分步驟分析spark中的原始碼之前我們先來了解KMeans類中引數的含義。

class KMeans private ( private var k: Int,//聚類個數 private var maxIterations: Int,//迭代次數 private var runs: Int,//執行kmeans演算法的次數 private var initializationMode: String,//初始化模式 private var initializationSteps: Int,//初始化步數 private var epsilon: Double,//判斷kmeans演算法是否收斂的閾值 private var seed: Long)

在上面的定義中,k表示聚類的個數,maxIterations表示最大的迭代次數,runs表示執行KMeans演算法的次數,在spark 2。0開始,該引數已經不起作用了。為了更清楚的理解演算法我們可以認為它為1。initializationMode表示初始化模式,有兩種選擇:隨機初始化和透過k-means||初始化,預設是透過k-means||初始化。initializationSteps表示透過k-means||初始化時的迭代步驟,預設是5,這是spark實現與第三章的演算法步驟不一樣的地方,這裡迭代次數人為指定, 而第三章的演算法是根據距離得到的迭代次數,為log(phi)。epsilon是判斷演算法是否已經收斂的閾值。

第一步,隨機初始化k箇中心點很簡單,具體程式碼如下:

private def initRandom(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { //取樣固定大小為k的子集 //這裡run表示我們執行的KMeans演算法的次數,預設為1,以後將停止提供該引數 val sample = data。takeSample(true, runs * k, new XORShiftRandom(this。seed)。nextInt())。toSeq //選取k個初始化中心點 Array。tabulate(runs)(r => sample。slice(r * k, (r + 1) * k)。map { v => new VectorWithNorm(Vectors。dense(v。vector。toArray), v。norm) }。toArray) }

第二步,透過已知的中心點,迴圈迭代求得其它的中心點。

var step = 0while (step < initializationSteps) { val bcNewCenters = data。context。broadcast(newCenters) val preCosts = costs //每個點距離最近中心的代價 costs = data。zip(preCosts)。map { case (point, cost) => Array。tabulate(runs) { r => //pointCost獲得與最近中心點的距離 //並與前一次迭代的距離對比取更小的那個 math。min(KMeans。pointCost(bcNewCenters。value(r), point), cost(r)) } }。persist(StorageLevel。MEMORY_AND_DISK) //所有點的代價和 val sumCosts = costs。aggregate(new Array[Double](runs))( //分割槽內迭代 seqOp = (s, v) => { // s += v var r = 0 while (r < runs) { s(r) += v(r) r += 1 } s }, //分割槽間合併 combOp = (s0, s1) => { // s0 += s1 var r = 0 while (r < runs) { s0(r) += s1(r) r += 1 } s0 } ) //選擇滿足機率條件的點 val chosen = data。zip(costs)。mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) pointsWithCosts。flatMap { case (p, c) => val rs = (0 until runs)。filter { r => //此處設定l=2k rand。nextDouble() < 2。0 * c(r) * k / sumCosts(r) } if (rs。length > 0) Some(p, rs) else None } }。collect() mergeNewCenters() chosen。foreach { case (p, rs) => rs。foreach(newCenters(_) += p。toDense) } step += 1}

第三步,求最終的k個點。

val bcCenters = data。context。broadcast(centers) //計算權重值,即各中心點所在類別的個數 val weightMap = data。flatMap { p => Iterator。tabulate(runs) { r => ((r, KMeans。findClosest(bcCenters。value(r), p)。_1), 1。0) } }。reduceByKey(_ + _)。collectAsMap() //最終的初始化中心 val finalCenters = (0 until runs)。par。map { r => val myCenters = centers(r)。toArray val myWeights = (0 until myCenters。length)。map(i => weightMap。getOrElse((r, i), 0。0))。toArray LocalKMeans。kMeansPlusPlus(r, myCenters, myWeights, k, 30) }

Spark kmeans族的聚類演算法的內容至此結束,有關Spark的基礎文章可參考前文:

想要入門大資料?這篇文章不得不看!Spark原始碼分析系列

阿里是怎麼做大資料的?淘寶怎麼能承載雙11?大資料之眸告訴你

Spark分散式機器學習原始碼分析:如何用分散式叢集構建線性模型?

高頻面經總結:最全大資料+AI方向面試100題(附答案詳解)

Spark分散式機器學習系列:一文帶你理解並實戰樸素貝葉斯!

Spark分散式機器學習系列:一文帶你理解並實戰決策樹模型!

Spark分散式機器學習系列:一文帶你理解並實戰整合樹模型!

一文帶你理解並實戰協同過濾!Spark分散式機器學習系列

參考連結:

http://spark。apache。org/docs/latest/mllib-clustering。html

https://github。com/endymecy/spark-ml-source-analysis