當前位置:成語大全網 - 書法字典 - scala中rdd type用的是什麽頭文件?

scala中rdd type用的是什麽頭文件?

1介紹。RDD:

RDD是壹個彈性分布式數據集,是元素的分布式集合。在spark中,對所有數據的操作無非是創建RDD,改造現有的RDD,調用RDD操作進行評估。在這壹切的背後,Spark會自動將RDD中的數據分發到集群,並將操作並行化。

Spark中的RDD是壹組不可變的分布式對象。每個RDD分為多個分區,運行在集群中的不同節點上。RDD可以包含Python、Java、Scala中的任何類型的對象,甚至是用戶定義的對象。

用戶可以通過兩種方式創建RDD:讀取外部數據集,或在驅動程序中分發對象集合,如list或set。

RDD的轉換操作都是延遲求值的,這意味著當我們在RDD上調用轉換操作時,操作不會立即執行。相反,Spark在內部記錄關於所請求操作的信息。我們不應該把RDD看成是壹個有具體數據的數據集,我們最好把每個RDD看成是我們通過變換運算構造的指令列表,記錄如何計算數據。將數據讀入RDD的操作也很懶,只有在需要的時候才會讀取數據。轉換操作和讀取操作都可以執行多次。

2.創建RDD數據集

(1)讀取外部數據集

val input = sc . textfile(input file dir)

(2)分發對象集,以list為例。

val lines = sc . parallelize(List(" hello world ","這是測試"));

3.RDD行動

(1)轉換操作

實現過濾轉換操作:

val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "));

val errors = lines . filter(line = & gt;line . contains(" error "));

errors.collect()。foreach(println);

輸出:

錯誤:a

錯誤:b

錯誤:c

可以看到,列表中包含單詞error的項目已經被正確地過濾掉了。

(2)合並操作

將兩個RDD數據集合並為壹個RDD數據集。

按照上面的程序示例:

val lines = sc . parallelize(List(" error:a "," error:b "," error:c "," test "," warnings:a "));

val errors = lines . filter(line = & gt;line . contains(" error "));

val warnings = lines . filter(line = & gt;line . contains(" warnings "));

val unionLines =errors.union(警告);

unionLines.collect()。foreach(println);

輸出:

錯誤:a

錯誤:b

錯誤:c

警告:a

可以看到,原始列表項中的錯誤項和警告項都被過濾掉了。

(3)獲得RDD數據集中的壹些或所有元素

①獲取RDD數據集中的壹些元素?。take(整數)?返回值列表

獲取RDD數據集中的前num個項目。

/**

*取RDD的前num個元素。這會壹個接壹個地掃描分區,所以

*如果需要很多分區,速度會很慢。在這種情況下,使用collect()獲取

*改為整個RDD。

*/

def take(num: Int): JList[T]

程序示例:連接

unionLines.take(2)。foreach(println);

輸出:

錯誤:a

錯誤:b

可以看出,輸出了RDD數據集聯合線的前兩項。

②獲取RDD數據集中的所有元素。collect()返回列表;

程序示例:

val all = union lines . collect();

all . foreach(println);

遍歷輸出RDD數據集聯合線的每個項目。

4.將功能傳遞給spark

在scala中,我們可以將定義的內聯函數、方法引用或靜態方法傳遞給Spark,就像Scala中的其他函數式API壹樣。我們必須考慮壹些其他的細節。必須傳遞的函數和它們引用的數據需要是可序列化的(Java的Serializable接口實現了)。另外,類似於Python,當傳遞壹個對象的方法或字段時,會包含壹個對整個對象的引用。我們可以將所需字段放在壹個局部變量中,以避免整個對象包含該字段。

類搜索函數(val查詢:字符串){

def isMatch(s: String):布爾值= {

包含(查詢)

}

def getMatchFunctionReference(rdd:RDD[字符串]):RDD[字符串]={

//問題:isMach代表this.isMatch,所以需要通過整個this。

rdd.filter(isMatch)

}

def getMatchesFunctionReference(rdd:RDD[字符串]):RDD[字符串] ={

//問題:query代表this.query,所以我們需要傳遞整個this。

rdd . flat map(line = & gt;line.split(查詢))

}

def getMatchesNoReference(rdd:RDD[字符串]):RDD[字符串] ={

//安全,只取出我們需要的字段放入局部變量。

val query 1 = this . query;

rdd . flat map(x = & gt;x.split(查詢1)

)

}

}

5.對於每個元素轉換操作:

轉換操作map()接收壹個函數,將該函數應用於RDD中的每個元素,並將該函數的返回結果作為結果RDD中的相應元素。關鍵詞:轉型

變換操作過濾器()接受壹個函數,並返回RDD中滿足新RDD中的函數的元素。關鍵詞:過濾

示例圖如下所示:

①地圖()

計算每個RDD值的平方。

val rdd = sc . parallelize(List(1,2,3,4));

val result = rdd . map(value = & gt;值*值);

println(result.collect()。mkString(",");

輸出:

1,4,9,16

過濾器()

②?從RDD集中刪除值為1的元素:

val rdd = sc . parallelize(List(1,2,3,4));

val result = rdd . filter(value = & gt;值!=1);

println(result.collect()。mkString(",");

結果:

2,3,4

我們也可以采取傳遞函數的形式,就像這樣:

功能:

def filter function(value:Int):Boolean = {

值!=1

}

使用:

val rdd = sc . parallelize(List(1,2,3,4));

val result = rdd . filter(filter function);

println(result.collect()。mkString(",");

③有時,我們希望為每個輸入元素生成多個輸出元素。實現這個功能的操作叫做flatMap()。與map()類似,我們提供給flatMap()的函數應用於輸入RDD的每個元素。但它不是元素,而是返回值序列的叠代器。輸出RDD不是由叠代器組成的。我們得到的是壹個RDD,它包含了每個叠代器可以訪問的所有元素。flatMap()的壹個簡單用法就是把輸入的字符串剪成單詞,如下圖:?

val rdd = sc . parallelize(List(" Hello world "、" hello you "、" world I love you "));

val result = rdd . flat map(line = & gt;line . split(" ");

println(result.collect()。mkString(" \ n ");

輸出:

妳好

世界

妳好

妳們

世界

妳們

6.集合運算

在RDD設置操作

功能

使用

RDD1.distinct()

生成壹個只包含不同元素的新RDD。需要數據洗牌。

RDD1.union(RDD2)

返回包含兩個RDD中所有元素的RDD。

RDD1 .交叉點(RDD2)

只返回兩個rdd中都存在的元素。

RDD1.substr(RDD2)

返回由只存在於第壹個RDD中而不存在於第二個RDD中的所有元素組成的RDD。需要數據洗牌。

集合運算對笛卡爾集合的處理:

RDD1 .笛卡爾坐標(RDD2)

返回兩個RDD數據集的笛卡爾集合。

程序示例:生成RDD集{1,2}和{1,2}的笛卡爾集。

val rdd 1 = sc . parallelize(List(1,2));

val rd D2 = sc . parallelize(List(1,2));

val rdd = rdd 1 . cartesian(rdd 2);

println(rdd.collect()。mkString(" \ n ");

輸出:

(1,1)

(1,2)

(2,1)

(2,2)

7.動作操作

(1)歸約運算

Reduce()將壹個函數作為參數,該函數將操作兩個RDD元素類型的數據,並返回壹個相同類型的新元素。壹個簡單的例子是函數+,它可以用來積累我們的RDD。使用reduce(),可以很容易地計算出RDD中所有元素的總和、元素個數以及其他類型的聚合運算。

以下是對RDD數據集的所有元素求和的程序示例:

val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.reduce((x,y)= & gt;x+y);

println(結果);

產量:55

(2)折疊()操作

接收壹個與reduce()接收的函數簽名相同的函數,並添加壹個初始值作為每個分區第壹次調用的結果。妳提供的初始值應該是妳提供的運算的單位元素,也就是說用妳的函數多次計算這個初始值都不會改變結果(比如+對應0,*對應1,或者拼接運算對應的空列表)。

程序示例:

①計算RDD數據集中所有元素的總和:

zero value = 0;//求和時,初始值為0。

val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.fold(0)((x,y)= & gt;x+y);

println(結果);

②計算RDD數據集中所有元素的乘積:

zero value = 1;//積分時,初始值為1。

val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results = rdd . fold(1)((x,y)= & gt;x * y);

println(結果);

(3)aggregate()運算

aggregate()函數的返回值類型不必與操作的RDD類型相同。

與fold()類似,使用aggregate()時,我們需要提供我們期望返回的類型的初始值。然後使用壹個函數來組合RDD中的元素,並將它們放入累加器中。考慮到每個節點在本地累加,最後,需要提供第二個函數來成對地組合累加器。

以下是壹個程序示例:

val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10));

val result=rdd.aggregate((0,0))(

(acc,value)= & gt;(acc。_ 1+值,符合。_2+1),

(acc1,ACC 2)= & gt;(acc1。_1+acc2。_1,acc1。_2+acc2。_2)

)

平均值=結果。_ 1/結果。_2;

println(平均值)

輸出:5

最終返回的是壹個元組2

表:對數據為{1,2,3,3}的RDD執行基本的RDD操作。

函數名項目的示例結果

Collect()返回rdd.collect()的所有元素{1,2,3,3}

count()的元素數RDD rdd.count() 4

CountByValue()每個元素在RDD RDD中出現的次數。Countbyvalue () {(1,1),

(2,1),

(3,2)

}

Take(num)從rdd返回num個元素rdd.take(2) {1,2}。

Top(num)返回RDD的前num個元素。從RDD訂購(2)(我的訂單){3,3}。

已訂購(數量)

(排序)按照提供的順序從RDD返回前num個元素。

Rdd.takeSample(false,1)是不確定的。

Take sample (with replacement,num,[seed])從rdd返回任意數量的元素,takeSample(false,1)是不確定的。

Reduce(func)並行集成RDD中的所有數據。Reduce ((x,y) = > x+y)

Fold(zero)(func)與reduce()相同,但初始值rdd.fold(0)((x,y)= & gt;x+y)

Aggregate(零值)(seq op,combo)類似於reduce(),但它通常返回不同類型的函數rdd.aggregate((0,0))。

((x,y)= & gt;

(x._1+y,x._2+1),

(x,y)= & gt;

(x . 1+y . 1,x . 2+y . 2)

) (9,4)

Foreach(func)對rdd.foreach(func) None中的每個元素使用給定的函數RDD。

8.永久緩存

因為火花RDD是懶惰的,有時我們想使用同壹個RDD很多次。如果妳只是簡單地在RDD上調用壹個動作,Spark每次都會重新計算RDD及其所有附屬地。這在叠代算法中特別昂貴,因為叠代算法經常多次使用同壹組數據。

為了避免多次計算同壹個RDD,Spark可以持久化數據。當我們讓Spark持久存儲壹個RDD時,計算RDD的節點將保存它們已經計算的分區數據。

出於不同的目的,我們可以為RDD選擇不同的持久性級別。默認情況下,persist()以序列化的形式在JVM的堆空間中緩存數據。

不同關鍵字對應的存儲級別表

等級

使用的空間

中央處理機時間

它在內存中嗎?

它在磁盤上嗎?

評論

僅限內存

高的

低的

直接存儲在內存中

僅存儲用戶

低的

高的

序列化並存儲在內存中。

內存和磁盤

低的

媒介

部分

部分

如果內存放不下數據,它就會溢出到磁盤上。

內存和磁盤用戶

低的

高的

部分

部分

數據放不進內存,在磁盤上溢出。序列化數據存儲在內存中。

僅磁盤

低的

高的

直接存儲在硬盤中。

程序示例:在內存中保存RDD數據集。

val rdd = sc . parallelize(List(1,2,3,4,5,6,7,8,9,10))。持久化(StorageLevel。MEMORY _ ONLY);

println(rdd.count())

println(rdd.collect()。mkString(",");

RDD也有unpersist()方法,可以調用該方法從緩存中手動刪除持久RDD。

9.不同的RDD類型

在scala中,將RDD轉換成特定函數的RDD(比如RDD[Double]上的數值運算)是通過隱式轉換自動處理的。這些隱式轉換可以將壹個RDD隱式轉換成各種封裝類,比如DoubleRDDFunctions(數值型數據的RDD)和PairRDDFunctions(鍵值對的RDD),這樣我們就有了mean()和variance()等附加函數。

示例程序:

val rdd = sc . parallelize(List(1.0,2.0,3.0,4.0,5.0));

println(rdd . mean());

其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。