當前位置:成語大全網 - 書法字典 - 如何用Spark實現現有的MapReduce程序

如何用Spark實現現有的MapReduce程序

本文將簡單展示如何在Spark中重現上述過程,妳會發現妳不需要逐字翻譯Mapper和Reducer!

作為元組的鍵值對

假設我們需要計算壹個大文本中每行的長度,並報告每個長度的行數。在HadoopMapReduce中,我們首先使用映射器生成壹個鍵-值對,以行的長度作為鍵,以1作為值。

公共類LineLengthMapper擴展

映射器& ltLongWritable,Text,IntWritable,IntWritable & gt{

@覆蓋

受保護的空映射(LongWritable行號,文本行,上下文上下文)

引發IOException,InterruptedException {

context . write(new int writable(line . getlength()),new int writable(1));

}

}

值得註意的是,映射器和還原器只對鍵值對進行操作。所以TextInputFormat提供給LineLengthMapper的輸入實際上是壹個key-value對,在文本中的位置是Key(很少使用,但是總需要有東西作為Key),文本充當值。

相應的Spark實現:

lines . map(line = & gt;(line.length,1))

在Spark中,輸入只是由字符串組成的RDD,而不是鍵-值鍵-值對。Spark中鍵-值鍵對的表示是壹個Scala元組,用(a,b)的語法創建。上述映射操作的結果是(Int,Int)元組的RDD。當壹個RDD包含很多元組時,它會獲得多個方法,比如reduceByKey,這對於重現MapReduce行為非常重要。

減少

Reduce()和reduceBykey()

要計算行長度的鍵-值對,需要將每個長度作為Reducer中的壹個鍵,並計算其行的總和作為壹個值。

公共類LineLengthReducer擴展

減速器& ltIntWritable,IntWritable,IntWritable,IntWritable & gt{

@覆蓋

受保護的void reduce(IntWritable length,Iterable & ltIntWritable & gt計數,

上下文Context)引發IOException,InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum+= count . get();

}

context.write(length,new int writable(sum));

}

}

Spark中上述映射器和縮減器對應的實現只需要壹行代碼:

val length counts = lines . map(line = & gt;(line.length,1))。reduceByKey(_ + _)

Spark的RDD API有壹個reduce方法,但是它會將所有的鍵值對縮減為壹個值。這不是Hadoop MapReduce的行為,它在Spark中的對應項是ReduceByKey。

此外,Reducer的Reduce方法接收多值流並產生0,1或更多的結果。reduceByKey接受將兩個值轉換為壹個值的函數,它是壹個簡單的加法函數,將兩個數字映射到它們的和。調用者可以使用這個相關函數將多個值減少到壹個值。與Reducer方法相比,根據鍵減少值是壹種更簡單、更準確的API。

制圖人

Map()和flatMap()

現在,考慮壹個計算以大寫字母開頭的單詞數量的算法。對於每行輸入文本,Mapper可以生成0,1個或更多的鍵值對。

公共類CountUppercaseMapper擴展

映射器& ltLongWritable,Text,Text,IntWritable & gt{

@覆蓋

受保護的空映射(LongWritable行號,文本行,上下文上下文)

引發IOException,InterruptedException {

for (String word : line.toString()。拆分(" "){

if(character . isupper case(word . charat(0))){

context.write(new Text(word),new int writable(1));

}

}

}

}

火花對應寫作:

lines.flatMap(

_.拆分(" ")。過濾器(word = & gtCharacter.isUpperCase(word(0)))。地圖(word = & gt(word,1))

)

簡單的Spark map函數不適合這個場景,因為map對於每個輸入只能產生壹個輸出,但是在這個例子中,壹行需要產生多個輸出。所以相對於MapperAPI,Spark的map函數語義更簡單,應用範圍更窄。

Spark的解決方案是首先將每壹行映射到壹組輸出值,這些輸出值可以是空的或多值的。然後它將被flatMap函數展平。數組中的單詞被過濾並轉換成函數中的元組。在本例中,真正模仿映射器行為的是flatMap,而不是Map。

groupByKey()

寫壹個數量減少器是很簡單的。在Spark中,reduceByKey可以用來統計每個單詞的總數。例如,出於某種原因,要求輸出文件中的每個單詞都要顯示為大寫字母及其數字。在MapReduce中,實現如下:

公共類CountUppercaseReducer擴展

減速器& ltText,IntWritable,Text,IntWritable & gt{

@覆蓋

受保護的void reduce(Text word,Iterable & ltIntWritable & gt計數、上下文語境)

引發IOException,InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum+= count . get();

}

語境

。write(新文本(word.toString()。toUpperCase()),new int writable(sum));

}

}

但是redeceByKey不能在Spark中單獨工作,因為他保留了原始密鑰。為了在Spark中進行模擬,我們需要更像Reducer API的東西。我們知道Reducer的reduce方法接受壹個鍵和壹組值,然後完成壹組轉換。GroupByKey和連續映射操作可以實現這個目標:

groupByKey()。map { case (word,ones)= & gt;(word.toUpperCase,ones.sum) }

GroupByKey只收集壹個鍵的所有值,不提供reduce函數。在此基礎上,任何轉換都可以作用於key和壹系列值。這裏,鍵被轉換成大寫字母,並且值被直接求和。

設置()和清理()

在MapReduce中,Mapper和Reducer可以聲明壹個setup方法,在處理輸入之前執行該方法來分配數據庫連接等昂貴的資源,同時可以使用cleanup函數來釋放資源。

公共類SetupCleanupMapper擴展

映射器& ltLongWritable,Text,Text,IntWritable & gt{

私有連接dbConnection

@覆蓋

受保護的void設置(上下文上下文){

數據庫連接=...;

}

...

@覆蓋

受保護的空清理(上下文上下文){

db connection . close();

}

}

Spark中的map和flatMap方法壹次只能對壹個輸入進行操作,沒有辦法執行轉換大量值前後的代碼。似乎設置和清除代碼可以直接放在Sparkmap函數調用之前和之後:

val數據庫連接=...

lines.map(...dbConnection.createStatement(...)...)

dbConnection.close() //錯誤!

然而,這種方法是不可行的,因為:

它將對象dbConnection放在map函數的閉包中,這要求它是可序列化的(例如,通過java.io.Serializable)。但是,數據庫連接之類的對象通常不能序列化。

Map是壹種轉換,而不是壹種操作,會延遲執行。無法及時關閉連接對象。

即便如此,它也只能關閉驅動上的連接,而不能釋放序列化副本版本分配的資源連接。

其實map和flatMap都不是Spark中Mapper最接近的對應函數,但是Spark中Mapper最接近的對應函數是非常重要的mapPartitions()方法,它不僅可以將單個值映射到單個值,還可以將壹組值映射到另壹組值,很像bulkmap方法。這意味著mapPartitions()方法可以在開始時在本地分配資源,在批處理映射結束時釋放資源。

添加setup方法很簡單,添加cleanup會比較困難,因為仍然很難檢測到轉換的完成。例如,這是可行的:

lines . map partitions { value iterator = & gt;

val數據庫連接=...//好的

val transformedIterator = value iterator . map(...數據庫連接...)

dbConnection.close() //還是不對!可能沒有計算叠代器

變壓器

}

重印