當前位置:成語大全網 - 新華字典 - 7.3 MapReduce工作流程

7.3 MapReduce工作流程

(1) 首先從HDFS中讀取數據,並對它做分片操作(split)

(2) 每個小分片單獨啟動壹個map任務來處理此分片的數據。map任務的輸入和輸出都是key-value

(3) 把每個map輸出的key-value都進行分區,然後做排序、歸並、合並後,分發給所有reduce節點去處理——這個過程稱為shuffle。因此map輸出的分區數量取決於reduce機器(節點)的數量。

(4) reduce處理後的結果再寫到HDFS中

註意:map之間是不會進行通信的,reduce之間也不直接信息交互。用戶不能直接控制節點之間的數據交換,都由MapReduce框架自身實現以降低開發難度。

對於上段的過程,分階段(模塊)做更詳細地說明。為方便,假設集群只包含兩個節點

首先由InputFormat模塊把文件從HDFS中讀取出來,並進行格式驗證。然後InputFormat還要把數據切分成多個分片split——註意這種切分只是壹種邏輯定義,物理上並不發生移動。

由記錄閱讀器RR(Record Reader)根據split的位置長度信息,把split從HDFS中讀出來,輸出是key-value格式(因為map函數只接收key-value格式輸入)。

將key-value輸入到map函數中(處理邏輯由用戶自定義),輸出中間結果。

將中間結果做shuffle處理,即分區、排序、合並、歸並,獲得key-list[value]形式的結果。然後,把shuffle的結果分發給 各節點 的reduce任務(註意這時候會跨節點交互數據)。

reduce函數對輸入數據進行分析(處理邏輯也是用戶自定義的),分析結果以key-value格式輸出。

OutputFormat對輸出格式進行檢查,並檢查其他設置,如輸出目錄是否存在。檢查通過後,把結果再寫入HDFS中去。

壹個大文件會被分成很多數據塊Block存儲在HDFS的各個數據節點DataNode中(每個Block都有多個冗余副本存儲在不同DataNode)。比如壹個文件被分為如下圖的六個Block。而分片也是對原來整個文件做處理的,也就是把六個Block合並起來重新分配(只是邏輯上的合並不是物理上的)。比如下圖的Split1包含全部Block1和部分Block2。

——也可以認為,邏輯層面上Block和Split沒有關系。

分片數量由用戶自定義。多個分片意味著可以並行處理文件,體現分布式計算的優勢。但是分片也不是越多越好,因為分片數量就是Map任務數量,而Map任務之間切換要消耗管理資源。所以過多的分片會影響執行效率。

習慣上會把split的大小就設置得和block壹樣,壹般是64MB或128MB,以盡量避免切分Block而增加傳輸開銷。

Map的數量即Split的數量(上面已經解釋)。

Reduce的數量由用戶自定義。最優選擇遵循的原則是,略小於集群中所有Reduce Slot的總量(考慮預留壹些資源處理可能發生的錯誤)。

Shuffle先後分為Map端Shuffle和Reduce端Shuffle

Map端Shuffle經歷如下過程

輸入數據是由RecordReader處理得到的key-value,然後給到Map任務,Map函數由用戶自定義,輸出是list(<key, value>)。

為了降低磁盤尋址開銷、提高效率,Map處理的結果並不直接寫入磁盤,而是先寫入緩存。

直到緩存即將寫滿,則觸發溢寫進程。首先對緩存中的數據做分區、排序和合並:

分區 是為了後面傳給Reduce任務做準備,所以有幾個Reduce Task就分幾個區。默認采用Hash函數,可以用戶自定義。

排序 是依據字典的key來做的。排序是系統默認操作,用戶不須幹預。

合並 (combine)可以減少鍵值對數量。比如有兩個鍵值對<"a",1 >和<"a",2>表示字符出現的次數,經過合並操作可得到<"a",3>。這樣可以減少後面寫磁盤的開銷。合並操作不是必須的,只有用戶定義了才會執行。

執行完上述操作後,緩存中的數據被寫入磁盤。需要註意的是,為了保證接收Map輸出不中斷,並不是把緩存徹底寫滿才觸發溢寫,而是在大約80%的時候就開始寫磁盤,同時剩下20%繼續接收Map輸出。

溢寫過程多次發生,則磁盤上會形成多個溢寫文件。當文件數量大於某個值(用戶可自定義),系統會將它們歸並(merge)成壹個大的文件存放在磁盤。這個大文件保持了前面分區、排序和合並的處理結果。

註意合並combine和歸並merge的不同:比如對於輸入<"a", 1>和<"a", 2>,combine的結果是鍵值對<"a", 3>,而merge的結果是<key, value-list>如<"a", <1,2>>

JobTracker會跟蹤歸並文件的生成,壹旦探測到壹個歸並大文件完成,就會通知各Reduce任務將屬於自己分區的部分拉走。Map端Shuffle就完成了。

Reduce端Shuffle的過程如下

當接到JobTracker通知,每個Reduce任務會從多個Map任務獲得數據,這些數據首先也會被保存在緩存中。

當緩存達到壹定大小,也會觸發溢寫操作:先做歸並然後合並,最後寫入磁盤。

於Map Shuffle壹樣,當磁盤中溢寫文件數量達到用戶設定值,則觸發文件歸並,最後把歸並後的大文件輸出給Reduce任務處理。

值得壹提的是,如果領取的任務很小,甚至達不到緩存上限,那麽系統會在緩存中做歸並合並處理後,跳過溢寫步驟,直接把數據傳給Reduce任務。

上面描述的過程是從數據流角度看。而從系統角度看,MapReduce運行用戶編寫的應用程序過程如下:

用戶啟動MapReduce後,程序會被部署到不同的機器上去。壹個機器會作為Master運行JobTracker,其他機器作為Worker運行TaskTracker

將Map Task和Reduce Task分配給各個Worker

從HDFS中讀取的數據被InputFormat分成許多Split,這些數據被提交給Map任務(處理邏輯由用戶編寫),輸入格式是key-value。Map任務數量和Split數量壹致。輸出為key-value列表

Map輸出先保存到緩存,達到壹定規模則觸發溢寫,即進行分區、排序、合並然後寫入磁盤

數據在磁盤中歸並後,由各Reduce任務取走各自分區的部分,然後執行用戶自定義的Reduce函數來處理數據,最後輸出key-value格式的結果

將結果文件寫入HDFS中。

Reference:

https://www.icourse163.org/learn/XMU-1002335004#/learn/content?type=detail&id=1214310152&sm=1