作為元組的鍵值對
假設我們需要計算壹個大文本中每行的長度,並報告每個長度的行數。在HadoopMapReduce中,我們首先使用映射器生成壹個鍵-值對,以行的長度作為鍵,以1作為值。
公共類LineLengthMapper擴展
映射器& ltLongWritable,Text,IntWritable,IntWritable & gt{
@覆蓋
受保護的空映射(LongWritable行號,文本行,上下文上下文)
引發IOException,InterruptedException {
context . write(new int writable(line . getlength()),new int writable(1));
}
}
值得註意的是,映射器和還原器只對鍵值對進行操作。所以TextInputFormat提供給LineLengthMapper的輸入實際上是壹個key-value對,在文本中的位置是Key(很少使用,但是總需要有東西作為Key),文本充當值。
相應的Spark實現:
lines . map(line = & gt;(line.length,1))
在Spark中,輸入只是由字符串組成的RDD,而不是鍵-值鍵-值對。Spark中鍵-值鍵對的表示是壹個Scala元組,用(a,b)的語法創建。上述映射操作的結果是(Int,Int)元組的RDD。當壹個RDD包含很多元組時,它會獲得多個方法,比如reduceByKey,這對於重現MapReduce行為非常重要。
減少
Reduce()和reduceBykey()
要計算行長度的鍵-值對,需要將每個長度作為Reducer中的壹個鍵,並計算其行的總和作為壹個值。
公共類LineLengthReducer擴展
減速器& ltIntWritable,IntWritable,IntWritable,IntWritable & gt{
@覆蓋
受保護的void reduce(IntWritable length,Iterable & ltIntWritable & gt計數,
上下文Context)引發IOException,InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum+= count . get();
}
context.write(length,new int writable(sum));
}
}
Spark中上述映射器和縮減器對應的實現只需要壹行代碼:
val length counts = lines . map(line = & gt;(line.length,1))。reduceByKey(_ + _)
Spark的RDD API有壹個reduce方法,但是它會將所有的鍵值對縮減為壹個值。這不是Hadoop MapReduce的行為,它在Spark中的對應項是ReduceByKey。
此外,Reducer的Reduce方法接收多值流並產生0,1或更多的結果。reduceByKey接受將兩個值轉換為壹個值的函數,它是壹個簡單的加法函數,將兩個數字映射到它們的和。調用者可以使用這個相關函數將多個值減少到壹個值。與Reducer方法相比,根據鍵減少值是壹種更簡單、更準確的API。
制圖人
Map()和flatMap()
現在,考慮壹個計算以大寫字母開頭的單詞數量的算法。對於每行輸入文本,Mapper可以生成0,1個或更多的鍵值對。
公共類CountUppercaseMapper擴展
映射器& ltLongWritable,Text,Text,IntWritable & gt{
@覆蓋
受保護的空映射(LongWritable行號,文本行,上下文上下文)
引發IOException,InterruptedException {
for (String word : line.toString()。拆分(" "){
if(character . isupper case(word . charat(0))){
context.write(new Text(word),new int writable(1));
}
}
}
}
火花對應寫作:
lines.flatMap(
_.拆分(" ")。過濾器(word = & gtCharacter.isUpperCase(word(0)))。地圖(word = & gt(word,1))
)
簡單的Spark map函數不適合這個場景,因為map對於每個輸入只能產生壹個輸出,但是在這個例子中,壹行需要產生多個輸出。所以相對於MapperAPI,Spark的map函數語義更簡單,應用範圍更窄。
Spark的解決方案是首先將每壹行映射到壹組輸出值,這些輸出值可以是空的或多值的。然後它將被flatMap函數展平。數組中的單詞被過濾並轉換成函數中的元組。在本例中,真正模仿映射器行為的是flatMap,而不是Map。
groupByKey()
寫壹個數量減少器是很簡單的。在Spark中,reduceByKey可以用來統計每個單詞的總數。例如,出於某種原因,要求輸出文件中的每個單詞都要顯示為大寫字母及其數字。在MapReduce中,實現如下:
公共類CountUppercaseReducer擴展
減速器& ltText,IntWritable,Text,IntWritable & gt{
@覆蓋
受保護的void reduce(Text word,Iterable & ltIntWritable & gt計數、上下文語境)
引發IOException,InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum+= count . get();
}
語境
。write(新文本(word.toString()。toUpperCase()),new int writable(sum));
}
}
但是redeceByKey不能在Spark中單獨工作,因為他保留了原始密鑰。為了在Spark中進行模擬,我們需要更像Reducer API的東西。我們知道Reducer的reduce方法接受壹個鍵和壹組值,然後完成壹組轉換。GroupByKey和連續映射操作可以實現這個目標:
groupByKey()。map { case (word,ones)= & gt;(word.toUpperCase,ones.sum) }
GroupByKey只收集壹個鍵的所有值,不提供reduce函數。在此基礎上,任何轉換都可以作用於key和壹系列值。這裏,鍵被轉換成大寫字母,並且值被直接求和。
設置()和清理()
在MapReduce中,Mapper和Reducer可以聲明壹個setup方法,在處理輸入之前執行該方法來分配數據庫連接等昂貴的資源,同時可以使用cleanup函數來釋放資源。
公共類SetupCleanupMapper擴展
映射器& ltLongWritable,Text,Text,IntWritable & gt{
私有連接dbConnection
@覆蓋
受保護的void設置(上下文上下文){
數據庫連接=...;
}
...
@覆蓋
受保護的空清理(上下文上下文){
db connection . close();
}
}
Spark中的map和flatMap方法壹次只能對壹個輸入進行操作,沒有辦法執行轉換大量值前後的代碼。似乎設置和清除代碼可以直接放在Sparkmap函數調用之前和之後:
val數據庫連接=...
lines.map(...dbConnection.createStatement(...)...)
dbConnection.close() //錯誤!
然而,這種方法是不可行的,因為:
它將對象dbConnection放在map函數的閉包中,這要求它是可序列化的(例如,通過java.io.Serializable)。但是,數據庫連接之類的對象通常不能序列化。
Map是壹種轉換,而不是壹種操作,會延遲執行。無法及時關閉連接對象。
即便如此,它也只能關閉驅動上的連接,而不能釋放序列化副本版本分配的資源連接。
其實map和flatMap都不是Spark中Mapper最接近的對應函數,但是Spark中Mapper最接近的對應函數是非常重要的mapPartitions()方法,它不僅可以將單個值映射到單個值,還可以將壹組值映射到另壹組值,很像bulkmap方法。這意味著mapPartitions()方法可以在開始時在本地分配資源,在批處理映射結束時釋放資源。
添加setup方法很簡單,添加cleanup會比較困難,因為仍然很難檢測到轉換的完成。例如,這是可行的:
lines . map partitions { value iterator = & gt;
val數據庫連接=...//好的
val transformedIterator = value iterator . map(...數據庫連接...)
dbConnection.close() //還是不對!可能沒有計算叠代器
變壓器
}
重印