溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RDD Transformation和Action源碼剖析

發布時間:2020-08-15 21:36:49 來源:網絡 閱讀:855 作者:jethai 欄目:大數據

wordcount.toDebugString查看RDD的繼承鏈條


所以廣義的講,對任何函數進行某一項操作都可以認為是一個算子,甚至包括求冪次,開方都可以認為是一個算子,只是有的算子我們用了一個符號來代替他所要進行的運算罷了,所以大家看到算子就不要糾結,他和f(x)的f沒區別,它甚至和加減乘除的基本運算符號都沒有區別,只是他可以對單對象操作罷了(有的符號比如大于、小于號要對多對象操作)。又比如取概率P{X<x},概率是集合{X<x}(他是屬于實數集的子集)對[0,1]區間的一個映射,我們知道實數域和[0,1]區間是可以一一映射的(這個后面再說),所以取概率符號P,我們認為也是一個算子,和微分,積分算子算子沒區別。

總而言之,算子就是映射,就是關系,就是**變換**!



**mapPartitions(f)**
f函數的輸入輸出都是每個分區集合的迭代器Iterator

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
該函數和map函數類似,只不過映射函數的參數由RDD中的每一個元素變成了RDD中每一個分區的迭代器。如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的過。
比如,將RDD中的所有數據通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection。
參數preservesPartitioning表示是否保留父RDD的partitioner分區信息。
參考文章:
http://lxw1234.com/archives/2015/07/348.htm



union(other: RDD[T])操作不去重,去重需要distinct()


subtract取兩個RDD中非公共的元素

sample返回RDD,takeSample直接返回數組(數組里面的元素為RDD中元素,類似于collect)


keyvalue之類的操作都在**PairRDDFunctions.scala**

mapValues只對value進行運算


groupBy相同key的元素的value組成集合

coGroup是在groupBy的基礎上

coGroup操作多個RDD,是兩個RDD里相同key的兩個value集合組成的元組

RDD Transformation和Action源碼剖析

參考文章:
http://www.iteblog.com/archives/1280



**combineByKey和reduceByKey,groupByKey(內部都是通過combineByKey)**

源碼分析:

    reduceByKey  mapSideCombine: Boolean = true
    
    groupByKey  mapSideCombine=false

所以優先使用reduceByKey,參考文章:http://www.iteblog.com/archives/1357


**join操作**

本質是先coGroup再笛卡爾積
    
      def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
      }

RDD Transformation和Action源碼剖析  

**yield** 關鍵字的簡短總結:

    針對每一次 for 循環的迭代, yield 會產生一個值,被循環記錄下來 (內部實現上,像是一個緩沖區).
    當循環結束后, 會返回所有 yield 的值組成的集合.
    返回集合的類型與被遍歷的集合類型是一致的.

參考文章:
http://unmi.cc/scala-yield-samples-for-loop/



cache persist也是lazy級別的


Action本質sc.runJob

foreach

collect()相當于toArray返回一個數組

collectAsMap()對keyvalue類型的RDD操作返回一個HashMap,key重復后面的元素會覆蓋前面的元素reduce

源碼解析:先調用collect()再放到HashMap[K, V]中

     def collectAsMap(): Map[K, V] = {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
      }
    
**reduceByKeyLocally**相當于reduceByKey+collectAsMap()

該函數將RDD[K,V]中每個K對應的V值根據映射函數來運算,運算結果映射到一個Map[K,V]中,而不是RDD[K,V]。

參考文章:
http://lxw1234.com/archives/2015/07/360.htm



**lookup**也是針對keyvalue返回指定key對應的value形成的seq


    def lookup(key: K): Seq[V] 


**reduce fold(每個分區是串行,有個初始值) aggregate(并行,與fold類似)**

前兩個元素作用的結果與第三元素作用依次類推


**SequenceFile**文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。目前,也有不少人在該文件的基礎之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進行合并成一個大文件,同時對這些小文件的位置信息構建索引。不過,這類解決方案還涉及到Hadoop的另一種文件格式——**MapFile**文件。SequenceFile文件并不保證其存儲的key-value數據是按照key的某個順序存儲的,同時不支持append操作。

參考文章:http://blog.csdn.net/xhh298781/article/details/7693358




**saveAsTextFile**->TextOutputFormat  (key為null,value為元素toString)

**saveAsObjectFile**(二進制)->saveAsSequenceFile->SequenceFileOutputFormat(key為null,value為BytesWritable)


cache\persist   


**checkpoint()**機制避免緩存丟失(內存不足)要重新計算帶來的性能開銷,會導致另外一個作業,比緩存更可靠


SparkContex.setCheckpointDir設置目錄位置

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女