RDD是壹個彈性分布式數據集,是元素的分布式集合。在spark中,對所有數據的操作無非是創建RDD,改造現有的RDD,調用RDD操作進行評估。在這壹切的背後,Spark會自動將RDD中的數據分發到集群,並將操作並行化。
Spark中的RDD是壹組不可變的分布式對象。每個RDD分為多個分區,運行在集群中的不同節點上。RDD可以包含Python、Java、Scala中的任何類型的對象,甚至是用戶定義的對象。
用戶可以通過兩種方式創建RDD:讀取外部數據集,或在驅動程序中分發對象集合,如list或set。
RDD的轉換操作都是延遲求值的,這意味著當我們在RDD上調用轉換操作時,操作不會立即執行。相反,Spark在內部記錄關於所請求操作的信息。我們不應該把RDD看成是壹個有具體數據的數據集,我們最好把每個RDD看成是我們通過變換運算構造的指令列表,記錄如何計算數據。將數據讀入RDD的操作也很懶,只有在需要的時候才會讀取數據。轉換操作和讀取操作都可以執行多次。
2.創建RDD數據集
(1)讀取外部數據集
val input = sc . textfile(input file dir)
(2)分發對象集,以list為例。
val lines = sc . parallelize(List(" hello world ","這是測試"));
3.RDD行動
(1)轉換操作
實現過濾轉換操作:
val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "));
val errors = lines . filter(line = & gt;line . contains(" error "));
errors.collect()。foreach(println);
輸出:
錯誤:a
錯誤:b
錯誤:c
可以看到,列表中包含單詞error的項目已經被正確地過濾掉了。
(2)合並操作
將兩個RDD數據集合並為壹個RDD數據集。
按照上面的程序示例:
val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "," warnings:a "));
val errors = lines . filter(line = & gt;line . contains(" error "));
val warnings = lines . filter(line = & gt;line . contains(" warnings "));
val unionLines =errors.union(警告);
unionLines.collect()。foreach(println);
輸出:
錯誤:a
錯誤:b
錯誤:c
警告:a
可以看到,原始列表項中的錯誤項和警告項都被過濾掉了。
(3)獲得RDD數據集中的壹些或所有元素
①獲取RDD數據集中的壹些元素?。take(整數)?返回值列表
獲取RDD數據集中的前num個項目。
/**
*取RDD的前num個元素。這會壹個接壹個地掃描分區,所以
*如果需要很多分區,速度會很慢。在這種情況下,使用collect()獲取
*改為整個RDD。
*/
def take(num: Int): JList[T]
程序示例:連接
unionLines.take(2)。foreach(println);
輸出:
錯誤:a
錯誤:b
可以看出,輸出了RDD數據集聯合線的前兩項。
②獲取RDD數據集中的所有元素。collect()返回列表;
程序示例:
val all = union lines . collect();
all . foreach(println);
遍歷輸出RDD數據集聯合線的每個項目。
4.將功能傳遞給spark
在scala中,我們可以將定義的內聯函數、方法引用或靜態方法傳遞給Spark,就像Scala中的其他函數式API壹樣。我們必須考慮壹些其他的細節。必須傳遞的函數和它們引用的數據需要是可序列化的(Java的Serializable接口實現了)。另外,類似於Python,當傳遞壹個對象的方法或字段時,會包含壹個對整個對象的引用。我們可以將所需字段放在壹個局部變量中,以避免整個對象包含該字段。
類搜索函數(val查詢:字符串){
def isMatch(s: String):布爾值= {
包含(查詢)
}
def getMatchFunctionReference(rdd:RDD[字符串]):RDD[字符串]={
//問題:isMach代表this.isMatch,所以需要通過整個this。
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd:RDD[字符串]):RDD[字符串] ={
//問題:query代表this.query,所以我們需要傳遞整個this。
rdd . flat map(line = & gt;line.split(查詢))
}
def getMatchesNoReference(rdd:RDD[字符串]):RDD[字符串] ={
//安全,只取出我們需要的字段放入局部變量。
val query 1 = this . query;
rdd . flat map(x = & gt;x.split(查詢1)
)
}
}
5.對於每個元素轉換操作:
轉換操作map()接收壹個函數,將該函數應用於RDD中的每個元素,並將該函數的返回結果作為結果RDD中的相應元素。關鍵詞:轉型
變換操作過濾器()接受壹個函數,並返回RDD中滿足新RDD中的函數的元素。關鍵詞:過濾
示例圖如下所示:
①地圖()
計算每個RDD值的平方。
val rdd = sc . parallelize(List(1,2,3,4));
val result = rdd . map(value = & gt;值*值);
println(result.collect()。mkString(",");
輸出:
1,4,9,16
過濾器()
②?從RDD集中刪除值為1的元素:
val rdd = sc . parallelize(List(1,2,3,4));
val result = rdd . filter(value = & gt;值!=1);
println(result.collect()。mkString(",");
結果:
2,3,4
我們也可以采取傳遞函數的形式,就像這樣:
功能:
def filter function(value:Int):Boolean = {
值!=1
}
使用:
val rdd = sc . parallelize(List(1,2,3,4));
val result = rdd . filter(filter function);
println(result.collect()。mkString(",");
③有時,我們希望為每個輸入元素生成多個輸出元素。實現這個功能的操作叫做flatMap()。與map()類似,我們提供給flatMap()的函數應用於輸入RDD的每個元素。但它不是元素,而是返回值序列的叠代器。輸出RDD不是由叠代器組成的。我們得到的是壹個RDD,它包含了每個叠代器可以訪問的所有元素。flatMap()的壹個簡單用法就是把輸入的字符串剪成單詞,如下圖:?
val rdd = sc . parallelize(List(" Hello world "、" hello you "、" world I love you "));
val result = rdd . flat map(line = & gt;line . split(" ");
println(result.collect()。mkString(" \ n ");
輸出:
妳好
世界
妳好
妳們
世界
我
愛
妳們
6.集合運算
在RDD設置操作
功能
使用
RDD1.distinct()
生成壹個只包含不同元素的新RDD。需要數據洗牌。
RDD1.union(RDD2)
返回包含兩個RDD中所有元素的RDD。
RDD1 .交叉點(RDD2)
只返回兩個rdd中都存在的元素。
RDD1.substr(RDD2)
返回由只存在於第壹個RDD中而不存在於第二個RDD中的所有元素組成的RDD。需要數據洗牌。
集合運算對笛卡爾集合的處理:
RDD1 .笛卡爾坐標(RDD2)
返回兩個RDD數據集的笛卡爾集合。
程序示例:生成RDD集{1,2}和{1,2}的笛卡爾集。
val rdd 1 = sc . parallelize(List(1,2));
val rd D2 = sc . parallelize(List(1,2));
val rdd = rdd 1 . cartesian(rdd 2);
println(rdd.collect()。mkString(" \ n ");
輸出:
(1,1)
(1,2)
(2,1)
(2,2)
7.動作操作
(1)歸約運算
Reduce()將壹個函數作為參數,該函數將操作兩個RDD元素類型的數據,並返回壹個相同類型的新元素。壹個簡單的例子是函數+,它可以用來積累我們的RDD。使用reduce(),可以很容易地計算出RDD中所有元素的總和、元素個數以及其他類型的聚合運算。
以下是對RDD數據集的所有元素求和的程序示例:
val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.reduce((x,y)= & gt;x+y);
println(結果);
產量:55
(2)折疊()操作
接收壹個與reduce()接收的函數簽名相同的函數,並添加壹個初始值作為每個分區第壹次調用的結果。妳提供的初始值應該是妳提供的運算的單位元素,也就是說用妳的函數多次計算這個初始值都不會改變結果(比如+對應0,*對應1,或者拼接運算對應的空列表)。
程序示例:
①計算RDD數據集中所有元素的總和:
zero value = 0;//求和時,初始值為0。
val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(0)((x,y)= & gt;x+y);
println(結果);
②計算RDD數據集中所有元素的乘積:
zero value = 1;//積分時,初始值為1。
val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results = rdd . fold(1)((x,y)= & gt;x * y);
println(結果);
(3)aggregate()運算
aggregate()函數的返回值類型不必與操作的RDD類型相同。
與fold()類似,使用aggregate()時,我們需要提供我們期望返回的類型的初始值。然後使用壹個函數來組合RDD中的元素,並將它們放入累加器中。考慮到每個節點在本地累加,最後,需要提供第二個函數來成對地組合累加器。
以下是壹個程序示例:
val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));
val result=rdd.aggregate((0,0))(
(acc,value)= & gt;(acc。_ 1+值,符合。_2+1),
(acc1,ACC 2)= & gt;(acc1。_1+acc2。_1,acc1。_2+acc2。_2)
)
平均值=結果。_ 1/結果。_2;
println(平均值)
輸出:5
最終返回的是壹個元組2
表:對數據為{1,2,3,3}的RDD執行基本的RDD操作。
函數名項目的示例結果
Collect()返回rdd.collect()的所有元素{1,2,3,3}
count()的元素數RDD rdd.count() 4
CountByValue()每個元素在RDD RDD中出現的次數。Countbyvalue () {(1,1),
(2,1),
(3,2)
}
Take(num)從rdd返回num個元素rdd.take(2) {1,2}。
Top(num)返回RDD的前num個元素。從RDD訂購(2)(我的訂單){3,3}。
已訂購(數量)
(排序)按照提供的順序從RDD返回前num個元素。
Rdd.takeSample(false,1)是不確定的。
Take sample (with replacement,num,[seed])從rdd返回任意數量的元素,takeSample(false,1)是不確定的。
Reduce(func)並行集成RDD中的所有數據。Reduce ((x,y) = > x+y)
九
Fold(zero)(func)與reduce()相同,但初始值rdd.fold(0)((x,y)= & gt;x+y)
九
Aggregate(零值)(seq op,combo)類似於reduce(),但它通常返回不同類型的函數rdd.aggregate((0,0))。
((x,y)= & gt;
(x._1+y,x._2+1),
(x,y)= & gt;
(x . 1+y . 1,x . 2+y . 2)
) (9,4)
Foreach(func)對rdd.foreach(func) None中的每個元素使用給定的函數RDD。
8.永久緩存
因為火花RDD是懶惰的,有時我們想使用同壹個RDD很多次。如果妳只是簡單地在RDD上調用壹個動作,Spark每次都會重新計算RDD及其所有附屬地。這在叠代算法中特別昂貴,因為叠代算法經常多次使用同壹組數據。
為了避免多次計算同壹個RDD,Spark可以持久化數據。當我們讓Spark持久存儲壹個RDD時,計算RDD的節點將保存它們已經計算的分區數據。
出於不同的目的,我們可以為RDD選擇不同的持久性級別。默認情況下,persist()以序列化的形式在JVM的堆空間中緩存數據。
不同關鍵字對應的存儲級別表
等級
使用的空間
中央處理機時間
它在內存中嗎?
它在磁盤上嗎?
評論
僅限內存
高的
低的
是
不
直接存儲在內存中
僅存儲用戶
低的
高的
是
不
序列化並存儲在內存中。
內存和磁盤
低的
媒介
部分
部分
如果內存放不下數據,它就會溢出到磁盤上。
內存和磁盤用戶
低的
高的
部分
部分
數據放不進內存,在磁盤上溢出。序列化數據存儲在內存中。
僅磁盤
低的
高的
不
是
直接存儲在硬盤中。
程序示例:在內存中保存RDD數據集。
val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10))。持久化(StorageLevel。MEMORY _ ONLY);
println(rdd.count())
println(rdd.collect()。mkString(",");
RDD也有unpersist()方法,可以調用該方法從緩存中手動刪除持久RDD。
9.不同的RDD類型
在scala中,將RDD轉換成特定函數的RDD(比如RDD[Double]上的數值運算)是通過隱式轉換自動處理的。這些隱式轉換可以將壹個RDD隱式轉換成各種封裝類,比如DoubleRDDFunctions(數值型數據的RDD)和PairRDDFunctions(鍵值對的RDD),這樣我們就有了mean()和variance()等附加函數。
示例程序:
val rdd = sc . parallelize(List(1.0,2.0,3.0,4.0,5.0));
println(rdd . mean());
其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。