當前位置:成語大全網 - 書法字典 - MapReduce執行過程

MapReduce執行過程

MapReduce有以下四個獨立的實體。

1.JobClient:運行在客戶端節點上,負責將MapReduce程序轉換成Jar包存儲在HDFS,並將Jar包的路徑提交給Jobtracker,由job tracker分發和監控任務。

2.JobTracker:運行在name node上,負責接收JobClient提交的作業,調度作業的各個子任務在TaskTracker上運行,並對它們進行監控。如果發現失敗的任務,它將重新運行。

3.TaskTracker:運行在數據節點,負責主動與JobTracker通信,接收作業,直接執行每個任務。

4.HDFS:用於與其他實體共享工作文件。

詳情如下:

1.JobClient通過RPC協議向JobTracker請求新應用程序的ID,以獲取MapReduce作業的ID。

2.JobTracker檢查作業的輸出描述。例如,如果沒有指定輸出目錄或者目錄已經存在,則作業不會被提交,並且會向JobClient拋出壹個錯誤;否則,將向JobClient返回壹個新的作業ID。

3.JobClient將作業所需的資源(包括作業JAR文件、配置文件和計算的輸入片段)復制到以作業ID命名的HDFS文件夾中。

4.JobClient通過submitApplication()提交作業。

5.JobTracker在收到調用它的submitApplication()消息後初始化任務。

6.JobTracker在HDFS上讀取要處理的文件,並開始計算輸入切片,每個切片對應壹個TaskTracker。

地圖任務不是隨便分配給任務跟蹤者的。這裏有壹個叫做數據本地的概念。它的意思是:將地圖任務分配給包含地圖處理過的數據塊的TaskTracker,同事將程序jar包復制到TaskTracker上運行,稱為“操作動,數據不動”。分配reduce任務時沒有考慮數據本地化。

7.TaskTracker通過心跳機制接收任務(任務的描述信息)。

8.TaskTracker讀取作業資源(JAR包、配置文件等。)在HDFS。

9.TaskTracker啟動壹個java子子進程來執行特定的任務(MapperTask或ReducerTask)。

10.TaskTracker將歸約結果寫入HDFS。

註意:取壹個HDFS塊的大小(默認為64M)作為切片?

自15的2.7.3版以來,塊大小已從64 MB更改為128 MB。

洗牌分析

Shuffle的中文意思是“洗牌”。如果這樣看,壹個map生成的數據通過hash過程分配給不同的reduce任務,是不是壹個洗牌數據的過程?

?洗牌的概念:

Collections.shuffle(List list):隨機打亂列表中元素的順序。

MapReduce中的Shuffle:描述了從地圖任務輸出數據到減少任務輸入的過程。

映射結束流程分析

1每個輸入片段將由壹個map任務處理。默認情況下,HDFS的塊大小(默認為64M)被用作片段,但是我們也可以設置塊的大小。map輸出的結果將被臨時放置在壹個循環內存緩沖區中(緩沖區的大小默認為100M,由io.sort.mb屬性控制)。當緩沖區即將溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建壹個溢出文件。

在寫入磁盤之前,線程先將數據按照reduce任務的數量分成相同數量的分區,即壹個reduce任務對應壹個分區的數據。這樣做是為了避免壹些reduce任務被分配了大量數據,而壹些reduce任務被分配了很少或沒有數據的尷尬情況。實際上,分區就是對數據進行哈希運算的過程。然後對每個分區中的數據進行排序。如果此時設置了Combianer,排序後的結果將被合並。這樣做的目的是將盡可能少的數據寫入磁盤。

當map任務輸出最後壹條記錄時,可能會有很多溢出文件,所以需要對這些文件進行合並。在合並的過程中,會連續進行排序和合並操作,目的有兩個:1,最小化每次寫入磁盤的數據量;2.在下壹個復制階段,盡量減少網絡傳輸的數據量。最後,它被合並成壹個分區和排序的文件。為了減少網絡傳輸的數據量,這裏可以壓縮數據,只要將mapred.compress.map.out設置為true即可。

數據壓縮:Gzip,Lzo,snappy。

4將分區中的數據復制到相應的reduce任務中。可能有人會問:分區裏的數據怎麽知道它對應的是哪個reduce?其實地圖任務壹直和它的父親TaskTracker保持聯系,TaskTracker也壹直和obTracker保持心跳。所以整個集群的宏信息保存在JobTracker中。reduce任務只要從JobTracker獲取對應的地圖輸出位置就可以了。

減少最終流程分析

1 reduce會接收不同地圖任務的數據,每個地圖的數據都是有序的。如果reduce端接收的數據量相當小,會直接存儲在內存中(緩沖區的大小由mapred . job . shuffle . input . buffer . percent屬性控制,表示用於此目的的堆空間的百分比);如果數據量超過緩沖區大小的壹定比例(由mapred . job . shuffle . merg . percent確定),數據將被合並並溢出到磁盤。

隨著溢出文件的增加,後臺線程會將它們合並成壹個更大更有序的文件,以便為後面的合並節省空間。事實上,mapreduce在Map和reduce上重復執行排序和合並操作。現在終於明白為什麽有人說排序是hadoop的靈魂了。

3合並過程中會有很多中間文件(寫入磁盤),但是MapReduce會讓寫入磁盤的數據盡量少,最後合並的結果不寫入磁盤,而是直接輸入到Reduce函數中。

4減速器的輸入文件。經過不斷的合並,最終會生成壹個“最終文件”。為什麽要放引號?因為該文件可能存在於磁盤或內存中。對於我們來說,我們希望它存儲在內存中,直接作為Reducer的輸入,但是默認情況下,這個文件是存儲在磁盤中的。當Reducer的輸入文件設置好後,整個洗牌最終結束。然後執行Reducer,結果放在HDSF上。

詳細流程:

1?每個地圖任務都有壹個內存緩沖區,用於存儲地圖的輸出結果。當緩沖區快滿時,緩沖區中的數據需要作為臨時文件存儲到磁盤。當整個map任務完成後,將該map任務在磁盤中生成的所有臨時文件進行合並,生成最終的正式輸出文件,然後等待reduce任務拉數據。

2?當執行地圖任務時,其輸入數據來自HDFS塊。當然,在MapReduce概念中,map task只讀取split。split和block之間的對應關系可能是多對壹,默認情況下是壹對壹。在wordcount示例中,假設地圖的輸入數據都是類似“aaa”的字符串。

3?經過mapper的運算,我們知道mapper的輸出是這樣壹個鍵/值對:鍵是“aaa”,值是1。因為當前map end只做1的加法運算,所以在reduce任務中合並結果集。之前,我們知道該作業有三個reduce任務。那麽,當前的“aaa”應該去處理哪種縮減呢?妳現在需要做決定。

4?MapReduce提供了壹個Partitioner接口,用來根據鍵或值的個數和Reduce來決定哪個reduce任務應該處理當前的輸出數據。默認情況下,鍵散列後面跟有reduce任務數據的模(余數)。默認模式只是平均化reduce的處理能力。如果用戶對分區器有自己的要求,他們可以定制它並在作業中設置它。

5?在示例中,“aaa”在分區後返回0,即這壹對值應該由第壹個reduce處理。接下來,您需要將數據寫入內存緩沖區。緩沖區的作用是批量收集地圖結果,減少磁盤IO的影響。我們的鍵/值對和分區的結果將被寫入緩沖區。當然,在寫之前,鍵值都會被序列化成字節數組。

6?內存緩沖區的大小有限,默認為100MB。當地圖任務輸出結果較多時,內存可能會出現爆倉,需要在壹定條件下將緩沖區中的數據臨時寫入磁盤,然後重用這個緩沖區。這個把數據從內存寫到磁盤的過程叫做溢出,中文可以理解為溢出。覆蓋由單獨的線程完成,並且不影響將映射結果寫入緩沖區的線程。溢出線程啟動時,應該不會阻止map結果的輸出,所以整個緩沖區的溢出比例為spill.percent,默認比例為0.8,即當緩沖區中的數據值已經達到閾值(緩沖區大小* spill percentage = 100 MB * 0.8 = 80MB)時,溢出線程啟動,鎖定80MB內存,執行溢出進程。地圖任務的輸出結果也可以寫入剩余的20MB內存,互不影響。

7?當溢出線程啟動時,就需要在這個80MB的空間裏對鍵進行排序。排序是MapReduce模型的默認行為,這裏的排序也是序列化字節的排序。

8?因為map任務的輸出需要發送到不同的reduce終端,而內存緩沖區並沒有合並要發送到同壹個reduce終端的數據,那麽這個合並就要體現在磁盤文件中。從官圖中我們還可以看到,壹些寫入磁盤的文件已經合並了不同reduce終端的值。因此,溢出過程的壹個重要細節是,如果有許多鍵/值對需要發送到reduce終端,則需要將這些鍵/值拼接在壹起,以減少與分區相關的索引記錄。

合並每個reduce端的數據時,有些數據可能看起來像這樣:“AAA”/1,“AAA”/1。對於wordcount示例,我們簡單地計算單詞出現的次數。如果在同壹個地圖任務的結果中有很多類似“aaa”的鍵出現了很多次,就要把它們的值組合起來。這個過程被稱為減少或合並。但是,在MapReduce術語中,Reduce只執行從多個maptasks獲取數據進行計算的過程。除了reduce,非正式合並數據只能算作合並。其實大家都知道,MapReduce把合並器等同於還原器。

如果客戶端已經設置了合並器,現在是使用合並器的時候了。將具有相同鍵的鍵/值對的值相加,以減少溢出到磁盤的數據量。Combiner優化了MapReduce的中間結果,所以會在整個模型中多次使用。那麽哪些場景可以使用合並器呢?從這個分析來看,合並器的輸出就是歸約器的輸入,合並器永遠無法改變最終的計算結果。所以在我看來,Combiner只應該用在Reduce的輸入key/value和輸出key/value完全相同的場景,不會影響最終結果。比如累加,最大值等。組合器必須小心使用。如果用得好,有助於作業執行的效率,否則會影響reduce的最終結果。

9?每次溢出都會在磁盤上生成壹個溢出文件。如果map的輸出結果真的很大,並且有很多這樣的溢出,那麽磁盤上就會有多個溢出文件。當map任務真正完成時,內存緩沖區中的所有數據也會溢出到磁盤,形成壹個溢出文件。最終,磁盤中至少會有壹個這樣的溢出文件(如果map的輸出結果很少,那麽在執行map時,只會生成壹個溢出文件)。因為最後只有壹個文件,所以這些溢出的文件需要合並在壹起,這個過程叫做合並。合並是什麽樣的?和前面的例子壹樣,“aaa”從壹個時間值為5的map任務中讀取,從另壹個時間值為8的map中讀取。因為它們有相同的密鑰,所以應該合並到壹個組中。

什麽是團體?對於“aaa”來說,它就像壹個真正的太陽:{“AAA”,[5,8,2,...]},從不同的溢出文件中讀取數組中的值,然後將這些值相加。請註意,因為merge將多個溢出文件合並成壹個文件,所以可能存在相同的鍵。在這個過程中,如果客戶端設置了合並器,它也會使用合並器合並同壹個密鑰。

至此,地圖端的所有工作都已經完成,最終生成的文件也存放在TaskTracker可以到達的本地目錄中。每減少壹次?Task不斷從JobTRacker獲取map任務是否通過RPC完成的信息。如果reduce task被通知TaskTracker上的map任務執行完畢,Shuffle進程的後半部分將開始。

減速側的洗牌過程:

1復制流程,簡單拉數據。Reduce進程啟動壹些數據復制線程(Fetcher),通過biner操作,有兩個目的:1,最小化每次寫入磁盤的數據量;2.在下壹個復制階段,盡量減少網絡傳輸的數據量。最後,它被合並成壹個分區和排序的文件。為了減少網絡傳輸的數據量,這裏可以壓縮數據,只要將mapred.compress.map.out設置為true即可。

數據壓縮:Gzip,Lzo,snappy。

4將分區中的數據復制到相應的reduce任務中。可能有人會問:分區裏的數據怎麽知道它對應的是哪個reduce?其實地圖任務壹直和它的父親TaskTracker保持聯系,TaskTracker也壹直和obTracker保持心跳。所以整個集群的宏信息保存在JobTracker中。reduce任務只要從JobTracker獲取對應的地圖輸出位置就可以了。

減少最終流程分析

1 reduce會接收不同地圖任務的數據,每個地圖的數據都是有序的。如果reduce端接收的數據量相當小,會直接存儲在內存中(緩沖區的大小由mapred . job . shuffle . input . buffer . percent屬性控制,表示用於此目的的堆空間的百分比);如果數據量超過緩沖區大小的壹定比例(由mapred . job . shuffle . merg . percent確定),數據將被合並並溢出到磁盤。

隨著溢出文件的增加,後臺線程會將它們合並成壹個更大更有序的文件,以便為後面的合並節省空間。事實上,mapreduce在Map和reduce上重復執行排序和合並操作。現在終於明白為什麽有人說排序是hadoop的靈魂了。

3合並過程中會有很多中間文件(寫入磁盤),但是MapReduce會讓寫入磁盤的數據盡量少,最後合並的結果不寫入磁盤,而是直接輸入到Reduce函數中。

4減速器的輸入文件。經過不斷的合並,最終會生成壹個“最終文件”。為什麽要放引號?因為該文件可能存在於磁盤或內存中。對於我們來說,我們希望它存儲在內存中,直接作為Reducer的輸入,但是默認情況下,這個文件是存儲在磁盤中的。當Reducer的輸入文件設置好後,整個洗牌最終結束。然後執行Reducer,結果放在HDSF上。

註意:調優MapReduce在很大程度上就是調優MapReduce Shuffle的性能。

第三,內存緩沖區:MapOutputBuffer

兩級索引結構:

環形緩沖區:

1 kvoffsets buffer:也叫offset index數組,用於存儲位置索引kvindices中鍵/值信息的偏移量。當kvoffsets的利用率超過io.sort.spill.percent(默認為80%)時,會觸發SpillThread的壹次“溢出”操作,即開始壹次溢出階段的操作。

2 kv indicators buffer:也稱為位置索引數組,用於保存數據緩沖區kvbuffer中鍵/值的起始位置。

3 kvbuffer數據緩沖區:用於保存實際的key/value值。默認情況下,緩沖區最多可以使用io.sort.mb的95%當kvbuffer的使用率超過io.sort.spill.percent(默認為80%)時,將觸發SpillThread的溢出操作,即開始溢出階段的操作。