當前位置:成語大全網 - 新華字典 - Spark RDD 分布式彈性數據集

Spark RDD 分布式彈性數據集

rdd是壹種彈性分布式的數據集,它代表著不可變的數據元素,可以被分區並行處理。

rdd是壹個粗粒度的數據生成方式和流轉叠代計算方式的描述。它可以通過穩定的存儲器或者從其他RDD生成,它並不需要急著進行轉換,只需要在特定的rdd進行壹次性的數據的叠代流轉。rdd記錄著自己的依賴關系,以防在數據丟失時可以通過“血緣”關系再次生成數據。用戶也可以自己選擇在經常重用的rdd進行數據落地,放置丟失後重做。

rdd的特性總結:

分布式的***享內存是壹種細粒度的讀寫,可以對每個存儲單元進行讀寫,其壹致性需要程序進行維護,其容錯性需要設置檢查點和程序回滾。但是RDD由於是不可變的粗粒度的讀寫,更適合於批量讀寫的任務,其可以使用“血緣”機制恢復數據,減少了設置檢查點的開銷。如果出現失敗時,也只用重新計算分區中丟失的那壹部分。另壹方面,RDD的不可變性可以讓系統可以像mapreduce壹樣采用後備任務的方式來代替運行緩慢的任務,不會出現相互影響的情況。

另外rdd也吸取了分布式***享內存的特性,rdd的批量操作可以根據數據所處的位置進行優化,提高性能。加載數據時,當內存不足時,rdd的性能下降是平穩的,不能載入內存的分區可以存儲在磁盤上。

上面的5點是rdd都會實現的接口,這也是rdd都具有的特性。

如上源碼所示,RDD提供了分區的抽象函數,即protected def getPartitions: Array[Partition],每個繼承RDD抽象類的RDD都會有自己的getPartitions的實現。RDD分區的多少代表著計算時的並發粒度。

用戶可以自己指定執行的分區數,如果用戶不自己指定,則使用默認的分區數。

從源碼中可以看出,如果不傳入分區數,則默認分區數為defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主機核數。

HadoopRDD是讀取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

spark.sparkContext.textFile(" hdfs://user/local/admin.text ") 中textFile是讀取hdfs文件的方法。其中會調用HadoopRDD。

textFile 是從HDFS分布式文件系統的所有節點上讀取數據,返回Strings的RDD。

總結下HadoopRDD分區規則:

1.如果textFile指定分區數量為0或者1的話,defaultMinPartitions值為1,則有多少個文件,就會有多少個分區。

2.如果不指定默認分區數量,則默認分區數量為2,則會根據所有文件字節大小totalSize除以分區數量,得到的值goalSize,然後比較goalSize和hdfs指定分塊大小(這裏是128M)作比較,以較小的最為goalSize作為切分大小,對每個文件進行切分,若文件大於大於goalSize,則會生成該(文件大小/goalSize)個分區,如果文件內的數據不能除盡則分區數會+1,則為(fileSize/goalSize)+1。

3.如果指定分區數量大於等於2,則默認分區數量為指定值,生成實際分區數量規則任然同2中的規則壹致。

總之:文件總大小除以分區數,大於分塊大小,則與分塊大小相關,否則以得到的商相關。

rdd優先位置返回的是每壹個分區的位置信息,按照移動計算的思路,將計算盡量分配到數據所在的機器上。

RDD的操作是粗粒度的操作,RDD進行轉換會形成新的RDD。形成的RDD和原RDD形成依賴關系,RDD通過這種“血緣”關系來維護數據的容錯性。RDD的依賴關系可以分為寬依賴和窄依賴兩種。

從中可以看出mapRDD是OneToOneDependency依賴,其父RDD為ParallelCollectionRDD。

從中可以看出groupRDD的依賴是ShuffleDependency依賴,其父依賴是MapPartitionsRDD。而groupbykey是需要進行shuffle的算子,屬於寬依賴。

Spark通過創建的類來表明,RDD間的依賴關系的類型,NarrowDependency屬於窄依賴,ShuffleDenpendency屬於寬依賴。之後會通過壹節來具體介紹其中的細節。

從上面的RDD源碼可以發現,每個RDD中都存在壹個compute()的函數,這個函數的作用就是為實現RDD具體的分區計算。

def compute(split: Partition, context: TaskContext): Iterator[T]

compute的返回值是分區的叠代器,每壹個分區都會調用這個函數。只有到action算子才會真正的執行計算。

partitioner指的是Spark的分區函數,目前最常用的有兩種,HashPartitioner和RangePartitioner, 其次還有縮減分區數的分區函數CoalescedPartitioner。分區這個概念,只存在於(K,V)鍵值對的RDD中,非鍵值對的RDD中partitioner為None。

分區函數即決定了RDD本身分區的數量,也決定了Shuffle中MapOut輸出中每個分區進行切割的依據。

HashPartitioner會對數據的key進行 key.hascode%numpartitions 計算,得到的數值會放到對應的分區中,這樣能較為平衡的分配數據到partition。

RangePartitioner:它是在排序算子中會用到的分區器,比如sortbykey、sortby、orderby等。該分區器先對輸入的數據的key做采樣,來估算Key的分布,然後按照指定的排序切分range,盡量讓每個partition對應的range裏的key分布均勻。

rdd中的算子可以分為兩種,壹個是transformation, 壹個是action算子。

1. Transformation:轉換算子,這類轉換並不觸發提交作業,完成作業中間過程處理。

2. Action:行動算子,這類算子會觸發SparkContext提交Job作業。