MapReduce——歷久而彌新

引子

MapReduce 是谷歌 2004 年(Google 內部是從03年寫出第一個版本)發表的論文裡提出的一個概念。雖然已經過去15 年了,但現在回顧這個大資料時代始祖級別概念的背景、原理和實現,仍能得到對分散式系統的很多直覺性的啟發,所謂溫故而知新。

在Google 的語境裡,MapReduce 既是一種程式設計模型,也是支援該模型的一種分散式系統實現。它的提出,讓沒有分散式系統背景的開發者,也能較輕鬆地利用大規模叢集以高吞吐量的方式來處理海量資料。

其解決問題思路很值得借鑑:找到需求的痛點(如海量索引如何維護,更新和排名),對處理關鍵流程進行高階抽象(分片Map,按需Reduce),以進行高效的系統實現(所謂量體裁衣)。這其中,如何找到一個合適的計算抽象,是最難的部分,既要對需求有直覺般的瞭解,又要具有極高的計算機科學素養。當然,並且可能更為接近現實的是,該抽象是在根據需求不斷試錯後進化出的海水之上的冰山一角。

需求

谷歌當時作為網際網路的最大入口,維護著世界全網索引,最早觸到了資料量的天花板。即,哪怕針對很簡單的業務邏輯:如從爬蟲資料中生成倒排索引、將圖狀網頁集合用不同方式組織、計算每個主機爬取的網頁數量、給定日期的高頻查詢詞彙等等,在全球網際網路資料的尺度的加成下,也變的異常複雜。

這些複雜性包括:輸入資料分散在非常多的主機上、計算耗資源太多單機難以完成、輸出資料需要跨主機進行重新組織。為此,不得不針對每個需求重複構造專用系統,並耗費大量程式碼在分發資料和程式碼、排程和並行任務、應對機器故障和處理通訊失敗等問題上。

抽象

map

reduce

的抽象靈感來自於函數語言程式設計語言 Lisp,為什麼選定這兩個概念呢?這來源於谷歌人對其業務的高度提煉:首先輸入可以切分成一個個邏輯的

記錄 (record)

;然後對其每個

record

執行某種

對映

(map)操作,生成一些鍵值對組成的中間結果(為什麼要分鍵和值呢?為最後一步做鋪墊,允許使用者將中間結果以任意指定的方式——

,來進行組織規約);最後在具有相同鍵的中間結果子集上執行

規約

reduce

,包括排序,統計,提取最值等等)操作。

函式式模型的另一個特點在於對 map 操作實現的約束,即規定使用者應提供一個無副作用的 map 操作(相關概念有

純函式

確定性

冪等性

等等,當然他們的概念並不一樣,後面小結會詳細討論)。如此限制,好處有二,可以進行大規模並行執行,可以透過換地兒重試來遮蔽主機故障。

具體到落地上,

map

reduce

都是使用者自定義函式。

map

函式接受一個 Record,不過為了靈活,一般也組織為鍵值對;然後產生 List[key, value],

reduce

函式接受一個 key 和該 key 對應的所有中間結果 List[value]。即:

map (k1,v1) -→ list(k2,v2)reduce (k2,list(v2)) -→ list(v2)

拿由谷歌這篇論文提出,後來成為大資料處理界的

hello world

級別示例程式

Word Count

(對一堆文件中的單詞計數)來說,

map

reduce

的實現長這樣:

map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, “1”);reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));

這裡有兩個有意思的點:

中間變數需要網路傳輸,必然會涉及到序列化

。MapReduce 的最初版本選擇是一切解釋權歸

執行時的

使用者程式碼所有,我只是傻傻的傳 string。即,規定使用者在 map 中將任何輸出的中間結果物件都轉換為 string,然後在 reduce 函式中接收該 Iterator[string] 後,自行解析為自己想要的格式。當然,在後來的模仿框架如 Hadoop 中,序列化和解序列化部分被拿了出來,可以由使用者來自定義實現來滿足功能或效能上的需求。沒去查證,谷歌後來相比對此也做了最佳化。這是一種很自然的

系統演化

思路,初期設計儘量簡單粗暴以求快速實現可用原型;在積累使用經驗後再對某些使用不便的模組進行擴充套件設計。

reduce 接受 value 集合被組織為了迭代器(Iterator)

。相信用過 Python 的同學應該對迭代器不陌生,它是一個很簡單的介面,包括

next

stop

兩個語義。配合

for loop

,構成一個很強大的抽象。不管你底層是一個記憶體中的 List、還是檔案內容、還是網路 IO 流,只要能在執行時知道如何得到下一條記錄,什麼時候時候停止,都能被 for 迴圈來利用,進行逐一處理。迭代器抽象的一個好處在於,不必將待迭代的內容一次載入到記憶體,可以對資料增量式的

惰性載入

。MapReduce 框架的此處實現也正是利用了該特性。

實現概覽

抽象定了,那麼實現自然可以有不同,這也是

介面和實現分離

的意義所在。前者的抽象是一種思想,谷歌已經給你做了;後者的實現,完全可以根據自己的生產環境進行量體裁衣的來定製實現。谷歌在 paper 中給了一種內部經典版,Hadoop 也提供了一套通用版,當然我們也可以根據自己的業務需求和場景約束來實現一個合身版。

谷歌

釋出論文時

實現 MapReduce 所面對的系統環境長這樣:

單機雙核 x86 架構,2~4G記憶體,Linux 系統

通用網路硬體,百兆或者千兆乙太網

叢集含數百或者數千臺機器,因此機器故障是常態

用的廉價 IDE 介面硬碟,但是人家有 GFS 來提供容錯

多租戶支援,因此需要做 Job 級別抽象和資源約束

流程概覽

輸入由使用者指定切分大小後,切分成

M

份,然後分散到不同機器上(由於 GFS 的存在,也可能該輸入 Block 本來就在那臺機器上)。每個機器上會並行的跑使用者定義的

map

map

輸出的中間結果,亦由使用者指定,按

key 值範圍

切分為

R

份,對於每個中間結果,透過

node label = hash(key) mod R

決定其去處。下面是流程概覽圖:

MapReduce——歷久而彌新

論文中 MapReduce 示意圖

首先把輸入切分成 M 份,通常每份 16 ~ 64M,這個粒度使用者按需進行把握;然後把這些分片分散到不同機器上(如果有GFS這種分散式檔案系統配合的話,可能本來就分散著),然後在每個機器上 fork 一份使用者的程式碼。對於

使用者程式碼分發

,是一個有意思的話題,對於不同語言可能有不同實現,比如谷歌的C++,可能傳輸的是動態連結庫;比如 Hadoop 的 Java 傳的可能是 jar 包 (當然, 所有依賴也得傳);如果是 PySpark 的 Python 呢,可能用的就是神奇的 cloudpickle;總之,不同語言需要考慮的傳輸機制是不一樣的,比如說動態語言和靜態語言;此外,全域性變數和外部依賴也是需要考慮的點,谷歌雖然在此一筆帶過,但是不同語言需要面對的情況可能差別很大。

Master 的這份程式複製是不一樣的,是負責安排活的。會選空閒的 worker 來安排這 M 個 map 任務和 R 個 reduce 任務。這需要考慮的是,worker 執行每個使用者程式碼是單獨啟動一個程序,還是插入到系統 loop 中去執行。

執行 map 任務的 Worker,會讀取被分配到的輸入切片,解析出鍵值對流,送給使用者定義的 map 函式。map 後產生的臨時結果首先會快取在記憶體中。雖然論文中沒有展開,但可以預見的是,如何切片,如何解析出鍵值對流,不同使用者對於同樣的輸入可能有不同的關注點,因此必然存在定製化解析的需求。這一部分(FileSplit 和 RecordReader)在稍後隨著承載業務的增多,估計也會開放出來給使用者自定義,事實上 Hadoop 就是這麼幹的。

快取的中間結果會被定期在執行 Map Task 的

機器本地

進行刷盤(這也算一個本地性的最佳化,但是也有後果,就是一旦該機器故障,容錯會稍微麻煩點,後面會說),並且按使用者指定的 Partition 函式拆分成 R 個塊,然後將這些位置資訊發給 Master 節點。Master 負責通知相應的 Reduce Worker 以拉取對應資料。

Reduce Worker 收到這些中間結果的位置資訊後,會透過 RPC 來拉取其對應的 Partition 的資料。對於某個 Reduce Worker 來說,待

所有資料

拉取完成後,會將其按照 key 來進行排序。這樣一來,所有具有同樣 key 的資料便捱到了一塊。第 4 步和第 5 步的過程合起來就是

shuffle

,涉及到外部排序、多機資料傳輸等

極其耗時

操作;當資料量比較小時,如何實現都不成問題。但是當資料量大到一定程度,這裡很容易成為效能瓶頸,因此也有一些最佳化方法,稍後會針對 shuffle 做詳細展開,此處按下不表。

待中間資料排好序之後,Reduce Woker 會對其進行掃描,然後將一個個 key 和其對應的值集合,即 傳給使用者定製的 reduce 函式,然後將生成的結果追加到最終輸出檔案。對於谷歌來說,一般來說就是支援並行 append 的檔案系統 GFS,好處在於可以多程序同時寫結果。

當所有 reduce 任務完成後,master 會喚醒使用者程序,即在使用者程式碼的視角,MapReduce 任務是阻塞的。

一般而言,使用者無需將最終結果的 R 個 Partition 進行合併,而是將其直接作為下一個 MapReduce 任務的輸入。Spark RDD 的partition 就是將這一特點

概念化

了,並且將每一步 MapReduce 輸出也放記憶體中,不進行落盤,以降低連續 MapReduce 任務的延遲。

區域性性

計算機科學中常用的一個原理,叫做

區域性性原理

(locality reference,這裡特指空間區域性性),說的是

程式在順序執行時,訪問了一塊資料,接下來大機率會訪問該資料(物理位置上)旁邊的一塊資料

。很樸素的斷言,卻是一切 cache 發揮作用的基礎,計算機儲存因此也形成了由慢到快,由賤到貴,由大到小的儲存層次體系(硬碟-> 記憶體 -> 快取 -> 暫存器)。在分散式環境中,這個層次體系至少還要再罩上一層——網路IO。

在 MapReduce 系統中,我們也會充分利用輸入資料的 locality。只不過這次,

不是將資料載入過來,而是將程式排程過去

Moving Computation is Cheaper than Moving Data

)。如果輸入存在 GFS 上,表現形式將為一系列的邏輯 Block,每個 Block 可能會有幾個(一般是三個)物理副本。對於輸入每個邏輯 Block,我們可以在其某個物理副本所在機器上執行 Map Task(如果失敗,就再換一個副本),由此來儘量減小網路資料傳輸。從而降低了延遲,節約了頻寬。

Master 的資料結構

谷歌的 MapReduce 實現是有

作業(Job)

級別的封裝的,每個 Job 包含一系列

任務(Task)

,即 Map Task 和 Reduce Task。那麼,我們要維護一個正在執行的 Job 的元資訊,就勢必要儲存所有正在執行的 Task 的狀態,其所在的機器 ID 等等。而且,Master 事實上充當了 Map Task 輸出到 Reduce Task 輸入的一個“

管道

”。每一個 Map Task 結束時,會將其輸出的中間結果的位置資訊通知 Master,Master 再將其轉給對應的 Reduce Task,Reduce Task 再去對應位置拉取對應 size 的資料。注意,由於 Map Task 的結束時間不統一,這個

通知->轉發-> 拉取

的過程是增量的。那麼不難推測出,reduce 側對中間資料排序的應該是一個不斷 merge 的過程,不大可能是等所有資料就位了再全域性排序。

在分散式系統中,一個比較忌諱的問題就是單點。因為是牽一髮而動全身,而 Master 就是這樣一個單點。當然單個機器的統計平均故障率並不高,但是一旦故障,那麼整個叢集都將不可用。但同時,有一個 Leader 節點會大大簡化分散式系統的的設計;因此採用單點 Master 的系統反而是主流,那勢必需要開發一些其他手段來強化 master 的容錯能力,比如說記 log + snapshot、比如說主從備份、比如說每次從 worker 心跳進行狀態重建、比如說用其他實現了分散式一致性協議的系統來儲存元資訊等等。

容錯

叢集中有 Master 和 Worker 兩種機器角色。

Worker 由於數量大,有機器故障機率較大。在分散式系統中,Master 獲取 Workers 的資訊,最常見便是

心跳

,既可以是 master ping worker,也可以反過來,也可以兼而有之。master 透過心跳發現某些 worker 不可到達後(可能是 worker 死掉了,也可能是網路出問題了等),就會將該 Worker 打個

故障

(failed)的標記。

之前已經排程到該故障 Worker 上的任務(Task) 很顯然有兩種型別: Map Task 和 Reduce Task。

對於 Map Task

以下所提的 Task,肯定是從屬於未結束的 Job

) ,不管成功與否,我們都要進行重試,因為一旦該 Worker 變為不可達,存於其上的中間結果也隨之無法被 Reduce Task 獲取。當然,我們可以在 Master 中多記點狀態來減少對已完成的 Map Task 進行重試的機率。比如記下某個 Map Task 的輸出是否已經都被 Reduce Task 拉取,以決定要不要對正常完成的 Map Task 進行重試,但無疑會提高系統複雜度。

工程往往會對環境做某些假設, 以簡化某些實現;

我們假設 worker 失敗率不是那麼高,或者重試所有 Map Task 的代價可以忍,那麼就可以簡化一點實現,以保持系統的簡約,減少可能的 bug。對於

Reduce Task

,未完成的無疑要進行重試,已經完成的,由於其輸出結果我們假設會寫到全域性分散式系統檔案中(即某些機器掛了也不影響),就不會重試。

具體重試的方法,可以標記需要重試的 Task 的狀態為 idle,以告訴排程器,該 Task 可以重新被排程。當然,也可以實現為從一個(工作/完成)佇列倒騰到另一個(就緒)佇列,本質上是一樣的,都是合理實現一個 Task 的

狀態機

至於 master 的故障恢復,上一節稍有提到。如果在實踐中 Master 確實很少死掉,並且偶爾死掉造成所有正在執行的任務失敗的後果也可以接受,那麼就可以粗暴的實現為如果 Master 死掉,就簡單通知所有正在執行的任務的使用者程式碼任務失敗了(比如返回非 0 值),然後有使用者程式碼自行決定丟棄任務還是待叢集重啟後進行重試:

MapReduceResult result;if (!MapReduce(spec, &result)) abort();

如果業務對於宕機時間有要求,或者大面積任務失敗不可以忍受,那麼就需要增強 Master 的容錯性。常用的方法上節有提到,這裡展開一下:

snapshot + log

:將 Master 的記憶體資料結構定期做快照(snapshot)同步到一個持久儲存,可以寫到外部儲存,可以同步到其他節點,可以一份也可以多份。但是隻做快照的話,時間間隔不好選擇:間隔太長容易造成恢復時狀態丟失過多;間隔過短會加重系統負載。因此常用的輔助手段是記 log,即對每個會改變系統的狀態的操作進行記錄。這樣就可以選擇長一點的快照間隔,恢復時,先載入快照,再疊加上快照點之後的日誌。

主從備份

。比如 Hadoop 原先的 secondary namenode,用屬於不同容錯閾兩臺機器都作為 Master,只不過一個用來響應請求,另一個用來實時同步狀態。等某臺機器故障發生時,立即將流量切換到另一個機器上。至於其同步機制,則是另一個可以考量的點。

狀態外存

。如果元資料量不大,可以用 Zookeeper 或者 etcd 這種實現了分散式一致性協議的叢集來儲存。由於這些叢集本身具有容錯能力,因此可以認為避免了單點故障。

心跳恢復

:重新啟動一個 Master 後,利用所有 Worker 報上來的資訊進行 Master 資料結構的重建。

還值得一提的是,

容錯也需要使用者側程式碼做配合

。因為框架會對不成功的 map/reduce 使用者程式碼進行重試。這就要求,使用者提供的 map/reduce 邏輯符合

確定性

Deterministic

):即函式的輸出依賴且僅依賴於輸入,而不取決任何其他隱形的輸入或者狀態。當然,這個蘊含了

冪等性

Idempotency

):多次執行和一次執行效果一樣;但是冪等性並不能推出確定性;假設有這麼一個函式,它第一次執行造成了一些狀態改變(比如某些釋放資源的 dispose 函式),而後續發現狀態已經改變過了就不再改變該狀態,那麼它符合冪等性;但是由於其含有隱式狀態輸入,不是確定性的。

如果 map/reudce 函式是確定性的,那麼框架就可以放心大膽重試了。某些條件下,冪等性也可以接受,比如儲存隱式狀態的地方很牢靠。舉個栗子,我們依賴於一個檔案鎖做判斷某個函式執行了一次或多次,如果該檔案鎖所依賴的檔案系統很穩定,並且提供分散式一致性,那麼就完全可以。如果是用 nfs 的一個檔案做鎖,來實現的所謂冪等性就值得商榷了。

如果 map/reduce 函式是確定性的,框架會透過其輸出提交的原子性來進行冪等性保證。即,即使重試了多次,也和只執行了一次一樣。具體來說,對於 Map Task,會產生 R 個臨時檔案,並在結束時將其位置傳送給 Master;Master 在收到多次同一分片(split) 的位置資訊時,如果該分片前面某次結果來源仍可用或者已經被消費,那麼就忽略掉該請求後面的所有請求。對於 Reduce Task,其生成的結果也會先寫成臨時檔案,然後依賴於底層檔案系統的原子性的改名操作(

原子性改名

也是一個多程序競爭的經典的操作,因為生成檔案過程比較長,不容易做成原子的,但是

判斷具有某名字的檔案是否存在並改名

卻很容易做成原子的),在處理完成時改變為目的檔名。如果發現已經有一個具有該目的檔名的檔案了,就放棄改名操作,從而保證了該 Reduce Task只有一個成功輸出的最終檔案。

任務粒度

一個 MapReduce Job 中會產生 M+R 個 Task,具體 M 和 R 的值在執行之前可以由人進行配置。不同的系統實現可能會有發揮出最佳系統性能的不同配比;但是同時要兼顧業務需求,比如輸入大小,輸出檔案個數等等。

備份任務

在實際業務中,由於某些主機原因常會出現

長尾效應

,即少數幾個 Map/Reduce Task 總是會巨慢地拖到最後,甚至拖得時間甚至是其他任務的幾倍。造成這些主機拖後腿的原因可以舉出很多,如:某個機器硬碟老化,讀寫速度幾十倍的變慢;又比如排程器排程的有問題,導致某些機器負載太高,具有大量的 CPU、記憶體、硬碟和網路頻寬的爭搶;又比如軟體 bug 導致某些主機變慢等等。

只要確定這些問題只發生在

少數

某些主機上,那麼解決方法也很簡單。在

任務接近尾聲

的時候(比如統計剩餘task的佔比小於一個閾值時),對於每個仍然在跑的任務,分別額外排程一份到其他主機上,那麼大機率會讓這些任務提前完成,同一任務跑多次的處理邏輯,和容錯重試造成跑多次是一致的,可以複用。

此外,我們可以透過實際業務檢驗來微調該閾值(包括怎麼判定任務結尾,啟用幾個備份任務),在耗費額外資源代價和減少任務總用時之前取得一個平衡。

其他細節

除了 Mapper 和 Reducer 這兩個最基本的源語,該系統還提供了一些其他的後來事實上也成為標配的擴充套件:Partitioner,Combiner 和 Reader/Writer

Partitioner

預設來說,對 Map 輸出的中間結果進行劃分會使用類似於

hash(key) mod R

這種應用無關的劃分演算法。但是有時候使用者有需求將特定的一些 keys 路由到同一個 Reduce Task,比如說中間結果的 key 是 URL, 使用者想按網站 host 進行彙總處理。這時候就需要將系統的這部分路由功能開放給使用者,以滿足使用者的定製需求。

Combiner

如果該 Job 針對所有中間結果的 reduce 的操作滿足

結合律

,那麼指定 Combiner 會很能提高效率。拿的 Word Count 來說,數值的加法無疑滿足結合律,也就是說,同一個單詞的頻次,在 Map Task 輸出後進行加和(在 Map Work 上),還是在 Reduce Task 中進行加和(在 Reduce Worker上),結果保持一致;而這樣一來,由於一些中間結果對進行了 combine,Map Task 到 Reduce Task 間的傳輸資料量會小很多,從而提高整個 Job 的效率。

也可以看出,combine 函式一般和 reduce 函式是一樣的,因為他們本質上是對 value set 執行了同一種操作,只不過執行時,執行的地點不一樣,結合的順序不一樣。目的是為了減少中間結果傳輸量,加速任務執行過程。

Reader/Writer

如果不將定製輸入輸出的能力開放給使用者,那麼系統顯然只能處理有限幾種預設約定的格式。因此,

reader

writer

介面本質上是系統和現實繁雜的業務之間的

介面卡(Adaptor)

。它們讓使用者可以自行指定資料的

按需要理解輸入內容

自由定製輸出格式

有了這兩個 Adaptor,系統才能適配更多的業務。一般來說,系統會內建提供一些常見的 Reader 和 Writer 的實現;包括按行讀文字檔案,讀檔案中鍵值,讀資料庫等等。然後使用者可以實現這兩個介面,進行更具體的定製。

系統常透過類似這種常用腳手架+進一步定製能力來提供API

,下面的 Counter 也是如此。

副作用

有些使用者實現的 map/reduce 函式會有一些副作用,比如說在執行任務中間輸出一些檔案、寫一些資料庫條目等等。一般來說這些副作用的原子性和冪等性需要使用者自己來處理。因為如果輸出介質不納入 MapReduce 系統,系統是沒有辦法保證這些輸出的冪等性和原子性的。不過有的系統就這麼幹的,提供一些某種型別/介質的狀態或者資料儲存,納入系統中,並且提供一些容錯和冪等的性質。好像 MillWheel 有類似的做法。但這樣會大大加重系統的複雜性。

跳過壞記錄

如果使用者程式碼有 bug 或者某些輸入有問題,會導致 Map 或者 Reduce 任務在執行時崩潰。當然這些 bug 或者輸入能修則修,但是有些情況由於第三方庫或者輸入的原因,不能夠進行修復。而在某些型別的任務,比如說訓練資料集清洗、大型統計任務,丟幾個是可以容忍的。針對這種情況,系統會提供一種模式,在這種模式中會跳過這些 Record 記錄的執行。

具體實現上來說,也比較簡單。可以給每個輸入 Record 給個

唯一編號

(單次任務內唯一就行);如果某個 Record 處理時掛掉了,就將其編號彙報給 Master。如果 Master 收到了某個 Record 超過一次的處理失敗資訊,就將其跳過。做的再細一點,還可以記下錯誤型別和資訊進行比對,來確定這是否是一個確定性(deterministic)的錯誤,進而決定是否將其跳過。

單機執行

眾所周知,分散式系統很難跟蹤、除錯;因為一個 Job 可能同時分散在數千臺機器上進行執行。因此係統提供了本地執行 Job 的能力。可以針對小資料集輸入對程式碼的正確性進行測試。由於在單機執行,

就可以比較方便透過除錯工具進行斷點追蹤

實現一個本地 mock 系統,一般來說比較簡單。因為不需要考慮網路間狀態通訊,程式碼多節點分發,多機排程等一系列分散式系統的問題。但卻能極大方便程式碼除錯,效能測試和小資料集測試。

狀態資訊

對於分散式執行的 Job,一個任務進度等

資訊視覺化介面

(給系統整合一個 HTTP 服務,實時拉取系統資訊進行展示)有時候是至關重要的,它是系統易用性的關鍵。如果系統使用者不能夠很方便的實時監控任務的執行進度、執行速度、資源用量、輸出位置,出錯資訊以及其他一些任務的元資訊,就不能對任務的執行狀況有個感性的把握。尤其是如果寫 MapReduce 程式的人和跑這些程式的不是一個人時,會更為依賴這些狀態的實時呈現。

因此,

對於分散式系統來說,其易用性有一大半落在一個良好的系統資訊呈現上

。使用者需要據此來預測任務的完成時間、資源的吃緊程度等等,從而做出相應決策。

此外,對於叢集機器狀態資訊,也需要進行跟蹤,因為機器的負載資訊、故障資訊、網路狀況等等對使用者任務的執行也有不同程度的影響。給出這些機器狀態資訊,有助於對使用者程式碼甚至系統程式碼進行出錯診斷。

全域性計數器

系統提供了一種計數服務,以統計某種事件的發生頻次。比如使用者想統計 Word Count 示例中所處理的全大寫單詞的總數:

Counter* uppercase;uppercase = GetCounter(“uppercase”);map(String name, String contents):for each word w in contents:if (IsCapitalized(w)):uppercase->Increment();EmitIntermediate(w, “1”);

從程式碼可以大致猜測其實現:定義的時候,需要給 Counter 指定一個 Id。然後在 Map/Reudce 程式碼中可以透過該 Id 獲取該 Counter 然後進行計數。每個 worker 機器上的計數資訊會彙總到 Master 上,然後按 Counter 的 ID 進行加和,並且最終返回給使用者。同時,前述展示狀態資訊頁面也會將這些計數器進行視覺化(比如說打折線圖)。其中有個點需要注意,就是多次對重試的任務(由於

機器死掉

或者

避免長尾

進行的重試)的計次進行去重;可以按照 Map/Reduce ID 來進行去重,即我們假定同一輸入的重試任務共享一個 Task ID(事實上為了滿足重試需求和任務管理需求,

分散式系統肯定會對所有任務進行唯一編號的

),針對具有相同 Task ID 內部的 Counter 的計次,Master 只保留第一次

成功

的那一份;但是如果計數需要在頁面上實時顯示,可能就需要做適當資訊保留,並且在該 Task 重試時進行計數回退之類的操作。

系統會自動維持一些計數器

,比如說所有已經處理的鍵值對的數量和所有已經產生的鍵值對數量。全域性計數操作對於某些應用用處很大,比如說有的應用要求所有輸入鍵值對和輸出鍵值對的數量一樣,如果沒有全域性計數,就無從驗證;或者統計一些資料的全域性比例等等。

重排(shuffle) 操作

自 Spark 成名之後,shuffle 這個 MapReduce 中的語義得到了很多研究和實踐。這是一個多機傳輸的耗時操作,其實現的高效性對系統的效能有著至關重要的作用,因此單獨拿出一節來聊聊。

在 MapReduce 中就是指 Map Task 分片輸出到 Reduce Task 按需拉取的這麼一個過程。還拿 Word Count 為例,你想統計某個單詞在所有文件中的總頻次,但是這些單詞分佈在不同機器上的不同的 Map Task 輸出裡;而只有將所有同樣單詞的頻次對聚集到同一臺機器上,才能對其加和。

這種將機器和子資料集對應關係按key打亂重組的操作,我們姑且稱之為 shuffle。

在 Spark 中,基本上繼承了該語義,並且更通用化了。一個常見的例子是 join,即將兩個 Table 間具有相同 key 的記錄路由到同一臺機器上,從而在所有機器上按 key 分片進行並行 join,從而大幅提高效率。類似於 join 這樣的高階操作,會使得底層的 Partition 不能繼續在本機執行而不與其他 Partition 發生聯絡,因此 shuffle 也是 Spark 中劃分 Stage 的一個分水嶺。

對於 MapReduce 系統來說,使用的 shuffle 策略類似於 Spark 中基於排序的 shuffle。Map 首先將中間結果寫到記憶體中,然後定期刷盤,刷盤時進行歸併排序。然後 Reducer 端按需拉取,從多個 Mapper 端拉取資料後,再次進行歸併排序,然後傳給 Reduce 函式。這樣的好處在於可以進行大規模資料處理,

因為可以做外部排序,也可以做迭代惰性載入

。對於 Hadoop 的實現來說,將包含 shuffle 的整個流程分為了明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。

其他的一些點

一些缺點

:透過 MapReduce 的系統設計可以看出,它是一個高吞吐,但是也高延遲的批次處理系統。並且不支援迭代。這也是後續 Spark,Flink 這樣系統火熱的動機。

檔案系統

: MapReduce 只有和 GFS 這樣支援分塊、多程序併發寫的大檔案系統配合才能發揮出更大的優勢,最佳化輸入和輸出的效能。此外,這種分散式檔案系統還會遮蔽底層節點故障。

組織形式

: MapReduce 是一個系統,需要部署到叢集上,但它同時又是一個庫,讓使用者程式碼和分散式叢集進行互動而不太用關心分散式環境中的問題的一個庫。每個任務需要寫任務描述(

MapReduceSpecification

),然後提交給系統——這是庫常用的一種提交任務的手段。

程式碼分發

:谷歌的 MapReduce 具體實現不得而知。猜測可以有兩種方式:一是將 MapReduce 庫程式碼 + 使用者程式碼整體在不同機器上 fork ,然後根據角色不同來執行不同分支。二是將各個機器上的服務先啟動起來,然後執行任務時只會將使用者自定義函式序列化後傳輸到不同機器。

名詞釋義

Intermediate result

:map 函式產生的中間結果,以鍵值對形式組織。

Map Task

:這個應該都是指的 Worker 機器上,執行 map 函式的工作程序。

Map Worker

:Map Task 所執行的 Worker 機器。所有 Worker 應該是沒有角色標記的,既可以執行 Map Task,也可以執行 Reduce Task,以充分地利用機器效能。

參考文獻

[1] Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters

[2] Alexey Grishchenko, Spark Architecture: Shuffle

[3] JerryLead, Spark internals

我是青藤木鳥,一個喜歡攝影、專注分散式系統的程式設計師,歡迎關注公眾號

木鳥雜記

,獲取更多分散式系統文章。