# Spark創建RDD的方式有哪些
## 目錄
1. [RDD核心概念回顧](#1-rdd核心概念回顧)
2. [從集合創建RDD](#2-從集合創建rdd)
3. [從外部存儲系統創建RDD](#3-從外部存儲系統創建rdd)
4. [從其他RDD轉換創建](#4-從其他rdd轉換創建)
5. [特殊創建方式](#5-特殊創建方式)
6. [RDD創建的最佳實踐](#6-rdd創建的最佳實踐)
7. [總結](#7-總結)
---
## 1. RDD核心概念回顧
**彈性分布式數據集(RDD)** 是Spark中最基本的數據抽象,具有以下核心特性:
- **不可變性**:只讀的數據分區集合
- **彈性**:支持數據重建的容錯機制
- **分布式**:數據跨集群節點存儲
- **惰性求值**:轉換操作延遲執行
RDD的五大核心屬性:
1. 分區列表
2. 計算每個分區的函數
3. 與其他RDD的依賴關系
4. 鍵值RDD的分區器
5. 計算每個分區的首選位置
## 2. 從集合創建RDD
### 2.1 parallelize方法
最基礎的創建方式,適用于小規模測試數據
```scala
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
// 指定分區數
val rdd = sc.parallelize(data, 5)
特點: - 數據完全加載到驅動程序內存 - 適合開發和測試場景 - 分區數默認為集群的CPU核心數
Spark提供的增強版parallelize
val rdd = sc.makeRDD(Seq(1, 2, 3))
// 支持指定分區位置偏好
val rdd = sc.makeRDD(Seq(
(1 to 10, Seq("host1")),
(11 to 20, Seq("host2"))
))
優勢: - 支持數據位置偏好設置 - 內部實現更高效
// 本地文件系統
val rdd = sc.textFile("file:///path/to/file")
// HDFS文件
val rdd = sc.textFile("hdfs://namenode:8020/path")
// 通配符匹配
val rdd = sc.textFile("/input/*.log")
// 整個目錄
val rdd = sc.wholeTextFiles("/input/")
關鍵參數:
- minPartitions:最小分區數
- wholeTextFiles返回(文件名,內容)的鍵值對
val rdd = sc.binaryFiles("/path/to/binfiles")
// 保存
rdd.saveAsObjectFile("/output")
// 讀取
val rdd = sc.objectFile("/output")
val rdd = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("/input")
支持格式: - SequenceFile - Avro - Parquet - 其他Hadoop兼容格式
// JDBC連接
val rdd = JdbcRDD.create(
sc,
() => DriverManager.getConnection("jdbc:postgresql://localhost/test"),
"SELECT * FROM table WHERE ? <= id AND id <= ?",
1, 100, 3,
r => (r.getInt(1), r.getString(2))
注意事項: - 需要將JDBC驅動jar包添加到classpath - 合理設置分區數避免連接數過多
// map轉換
val newRDD = rdd.map(_ * 2)
// filter轉換
val filtered = rdd.filter(_ > 3)
// flatMap轉換
val words = lines.flatMap(_.split(" "))
// reduceByKey
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// groupByKey
val grouped = pairs.groupByKey()
// join操作
val joined = rdd1.join(rdd2)
// mapPartitions
val partRDD = rdd.mapPartitions(iter => iter.map(_ * 2))
// repartition
val repartRDD = rdd.repartition(10)
// coalesce
val coalesced = rdd.coalesce(5)
// 創建數字序列
val rangeRDD = sc.range(0, 100, 1, 5)
// 隨機雙精度數
val randomRDD = RandomRDDs.normalRDD(sc, 100L)
val empty = sc.emptyRDD[String]
val pairRDD = sc.parallelizePairs(Seq(
("key1", 1), ("key2", 2)
))
sc.textFile(path, minPartitions)// 控制并行化數據大小
val MAX_MEMORY = Runtime.getRuntime.maxMemory / 2
if (data.size > MAX_MEMORY) {
// 改用外部存儲方式
}
// 添加隨機前綴解決傾斜
val salted = rdd.map {
case (k, v) => (s"${Random.nextInt(10)}_$k", v)
}
// 使用Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Spark提供了多樣化的RDD創建方式以滿足不同場景需求:
| 創建方式 | 適用場景 | 特點 |
|---|---|---|
| 并行化集合 | 小規模測試數據 | 簡單快速 |
| 外部存儲系統 | 生產環境大數據 | 支持多種文件格式 |
| RDD轉換 | 數據處理流水線 | 惰性求值 |
| 特殊方法 | 特定需求場景 | 功能專一 |
未來趨勢: - 雖然DataSet/DataFrame API逐漸成為主流 - RDD仍然是底層核心抽象 - 理解RDD創建原理對性能調優至關重要
通過合理選擇創建方式,可以顯著提升Spark應用的執行效率和資源利用率。 “`
這篇文章共計約2700字,采用Markdown格式編寫,包含: 1. 7個主要章節 2. 多個代碼示例 3. 表格對比 4. 結構化標題 5. 關鍵點強調 6. 最佳實踐建議
內容全面覆蓋了Spark創建RDD的各種方式及其應用場景,適合作為技術文檔或學習資料使用。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。