Google在2003年到2006年發表了三篇有影響力的文章,分別是2003年SOSP的GFS,2004年OSDI的MapReduce,2006年OSDI的BigTable。SOSP和OSDI都是操作系統領域的頂級會議,在計算機學會的推薦會議中屬於A類。SOSP在奇數年舉行,而OSDI在偶數年舉行。
然後這個博客會介紹MapReduce。
1.MapReduce是做什麽的?
因為找不到Google的原理圖,所以想借用壹個Hadoop項目的結構圖來說明MapReduce的位置,如下圖。
Hadoop其實就是Google Sambo的開源實現,Hadoop MapReduce對應Google MapReduce,HBase對應BigTable,HDFS對應GFS。HDFS(或GFS)為上層提供高效的非結構化存儲服務,HBase(或BigTable)是提供結構化數據服務的分布式數據庫,Hadoop MapReduce(或Google MapReduce)是用於作業調度的並行計算編程模型。
GFS和BigTable已經為我們提供了高性能、高並發的服務,但是並行編程並不是所有程序員都能享受到的。如果我們的應用程序不能並發,那麽GFS和BigTable就沒有意義。MapReduce的偉大之處在於,不熟悉並行編程的程序員也能充分發揮分布式系統的威力。
簡單來說,MapReduce就是把壹個大作業拆分成幾個小作業的框架(大作業和小作業本質上應該是壹樣的,只是規模不同)。用戶需要做的是決定拆分成多少份,並定義作業本身。
這裏有壹個貫穿整篇文章的例子來解釋MapReduce是如何工作的。
2.示例:統計詞頻
如果我想統計壹下10年來計算機論文中最常見的單詞,看看大家都在研究什麽,那我收集完論文之後應該怎麽做?
方法壹:我可以寫壹個小程序,按順序遍歷所有的試卷,統計每個單詞出現的次數,最後知道哪些單詞最受歡迎。
這種方法在數據集很小的情況下非常有效,實現起來也最簡單,適合解決這個問題。
方法二:寫壹個多線程程序,並發遍歷紙張。
理論上,這個問題可以是高度並發的,因為壹個文件的統計信息不會影響另壹個文件的統計信息。當我們的機器是多核或者多處理器的時候,方法2肯定比方法1效率高。但是編寫多線程程序比方法壹要困難得多。我們必須自己同步數據,例如,防止兩個線程重復統計文件。
方法三:將作業交給多臺電腦完成。
我們可以用第壹種方法的程序部署到n臺機器上,然後把集合分成n份,每臺機器運行壹個作業。這種方法運行速度夠快,但是部署起來很麻煩。我們要手動把程序拷貝到其他機器上,手動把短文分開,最痛苦的是把N個運行結果整合起來(當然也可以再寫壹個程序)。
方法4:讓MapReduce幫助我們!
MapReduce本質上是第三種方法,但是如何拆分文件集,如何復制程序,如何整合結果都是框架定義的。我們只需要定義這個任務(用戶程序),剩下的壹切交給MapReduce。
在介紹mapreduce的工作原理之前,先說壹下Map和reduce兩個核心函數,以及MapReduce的偽代碼。
3.映射函數和歸約函數
Map函數和reduce函數由用戶實現,這兩個函數定義了任務本身。
Map函數:接受壹個鍵值對,並生成壹組中間鍵值對。MapReduce框架將把由map函數生成的中間鍵-值對中具有相同鍵的值傳遞給Reduce函數。
Reduce函數:接受壹個鍵和壹組相關的值,並將這些值組合起來產生壹組更小的值(通常只有壹個值或零值)。
統計詞頻的MapReduce函數核心代碼很短,主要是實現這兩個功能。
【平淡】?查看純文本
地圖(字符串?關鍵,?字符串?值):
//?關鍵:?文檔?名字
//?值:?文檔?內容
為了什麽?每個?詞?w?在?價值:
發射中間體(w,"1");
減少(字符串?關鍵,?叠代器?值):
//?關鍵:?答?單詞
//?價值觀:?答?列表?的?計數
int?結果?=?0;
為了什麽?每個?v?在?價值觀:
結果?+=?parse int(v);
emit(AsString(result));
在統計詞頻的例子中,map函數接受的鍵是文件名,值是文件的內容。map逐個遍歷單詞,每遇到壹個單詞w,壹個中間鍵值對
4.MapReduce是如何工作的
上圖是論文中給出的流程圖。壹切從頂層用戶程序開始,鏈接MapReduce庫,實現最基本的地圖功能和Reduce功能。圖中的執行順序用數字標註。
MapReduce庫首先將用戶程序的輸入文件分成m個文件(m由用戶定義),每個文件通常有16MB到64MB,分為split 0 ~ 4,如左圖所示。然後使用fork將用戶進程復制到集群中的其他機器上。
用戶程序的壹個副本稱為主程序,其他的稱為工作程序。master負責調度和分配作業(映射作業或減少作業)給空閑的worker,worker的數量也可以由用戶指定。
分配了地圖作業的Worker開始讀取相應切片的輸入數據,地圖作業的數量由m決定,對應壹個壹個拆分;映射作業從輸入數據中提取鍵值對,每個鍵值對作為參數傳遞給映射函數,映射函數生成的中間鍵值對緩存在內存中。
緩存的中間鍵值對會定期寫到本地磁盤,它會被分成r個區,大小由用戶定義,每個區對應壹個未來的Reduce作業;這些中間鍵-值對的位置將被通知給主設備,主設備將負責將信息轉發給Reduce worker。
Master通知分配了Reduce作業的worker它所負責的分區在哪裏(肯定不止壹個地方,每個映射作業生成的中間鍵值對可能映射到所有R個不同的分區)。當Reduce worker讀取它所負責的所有中間鍵值對時,它們首先被排序,這樣同壹個鍵的鍵值對就聚集在壹起了。排序是必要的,因為不同的鍵可能被映射到同壹個分區,也就是同壹個Reduce作業(誰想要更少的分區)。
Reduce worker遍歷排序後的中間鍵-值對,對於每個惟壹鍵,將鍵和關聯值傳遞給Reduce函數,reduce函數生成的輸出將被添加到這個分區的輸出文件中。
當所有的Map和Reduce作業完成後,master喚醒正版用戶程序,MapReduce函數調用返回用戶程序的代碼。
在所有的執行之後,MapReduce輸出被放在R個分區的輸出文件中(分別對應於壹個Reduce作業)。用戶通常不需要合並這些R文件,而是將它們作為輸入給另壹個MapReduce程序進行處理。整個過程中,輸入數據來自底層分布式文件系統(GFS),中間數據放在本地文件系統,最終輸出數據寫入底層分布式文件系統(GFS)。而且要註意Map/reduce作業和Map/Reduce函數的區別:Map作業處理壹片輸入數據,可能需要多次調用Map函數來處理每壹個輸入的鍵值對;reduce作業處理壹個分區的中間鍵-值對,在此期間,Reduce函數為每個不同的鍵調用壹次,Reduce作業最終對應於壹個輸出文件。
我更喜歡把過程分為三個階段。第壹階段是準備階段,包括1,2,以MapReduce庫為主角,完成拆分作業、復制用戶程序等任務;第二階段是跑步階段,包括3、4、5、6。主角是用戶自定義的map和reduce函數,每個小作業獨立運行。第三個階段是結束階段,此時作業已經完成,作業結果被放入輸出文件,這取決於用戶想要對這些輸出做什麽。
5.詞頻是怎麽統計的?
結合第四節,可以知道第三節的代碼是如何工作的。假設我們定義M=5,R=3,有6臺機器,1臺主機。
這張圖片描述了MapReduce如何處理詞頻統計。因為Mapworkers的數量不夠,首先處理1,3,4段,生成中間鍵值對。當所有的中間值都準備好時,Reduce作業開始讀取相應的分區並輸出統計結果。
6.用戶的權利
用戶的主要任務是實現map和reduce接口,但也有壹些有用的接口對用戶開放。
輸入閱讀器.這個函數將輸入分成m個部分,並定義如何從數據中提取初始鍵值對。比如在詞頻的例子中,定義了文件名和文件內容是鍵值對。
配分函數.該函數用於將map函數生成的中間鍵值對映射到壹個分區。最簡單的實現是散列密鑰,然後對r取模..
比較功能.此功能用於對減少的作業進行分類。這個函數定義了鍵的大小關系。
輸出作者.負責將結果寫入底層分布式文件系統。
組合器功能.其實就是reduce函數,用於上面提到的優化。例如,在統計詞頻時,如果每個
更不用說地圖和縮小功能了。
7.MapReduce的實現
目前MapReduce的實現有很多,除了Google自己的實現,還有著名的hadoop,不同的是Google是c++,而hadoop用的是java。另外,斯坦福大學已經實現了壹個運行在多核/多處理器,* * *內存共享環境下的MapReduce,稱為Phoenix(簡介)。相關論文於2007年在HPCA發表,是當年最好的論文!