# 如何進行DAGScheduler源碼解讀
## 一、前言
DAGScheduler是Apache Spark核心調度層的關鍵組件,負責將邏輯執行計劃(RDD依賴關系)轉換為物理執行計劃(Stage劃分與任務調度)。本文將從源碼層面深入解析DAGScheduler的工作原理,幫助開發者理解Spark作業調度的核心機制。
## 二、環境準備
### 1. 源碼獲取
```bash
git clone https://github.com/apache/spark.git
cd spark/core/src/main/scala/org/apache/spark/scheduler
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
...
) extends Logging {
// 關鍵成員變量
private[scheduler] val nextStageId = new AtomicInteger(0)
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
...
}
Driver Program
↓
DAGScheduler (Stage劃分)
↓
TaskScheduler (集群任務調度)
↓
SchedulerBackend (資源協商)
↓
Executor (任務執行)
// 核心調用鏈
dagScheduler.runJob()
→ submitJob()
→ handleJobSubmitted()
→ createResultStage()
→ getOrCreateParentStages()
private def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
Stage類型 | 觸發條件 | 特點 |
---|---|---|
ResultStage | 執行Action操作 | 包含最終計算函數 |
ShuffleMapStage | 存在Shuffle依賴 | 輸出Shuffle數據 |
// 事件循環處理核心
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
override def onReceive(event: DAGSchedulerEvent): Unit = {
event match {
case JobSubmitted(...) => dagScheduler.handleJobSubmitted(...)
case StageCancelled(...) => dagScheduler.handleStageCancellation(...)
case ... // 其他事件處理
}
}
}
// 任務生成關鍵代碼
private[scheduler] def submitMissingTasks(
stage: Stage,
jobId: Int): Unit = {
// 1. 確定需要計算的分區
val partitionsToCompute = stage.findMissingPartitions()
// 2. 創建Task集合
val tasks: Seq[Task[_]] = stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { p =>
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, ...)
}
case stage: ResultStage =>
partitionsToCompute.map { p =>
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, ...)
}
}
// 3. 提交任務
taskScheduler.submitTasks(new TaskSet(...))
}
// 最大重試次數配置
spark.stage.maxConsecutiveAttempts = 4
// 本地性優先級排序
val locs = taskIdToLocations(taskId).map {
case TaskLocation(host, executorId) =>
(host, executorId) match {
case (Some(h), Some(e)) => Seq(ExecutorCacheTaskLocation(h, e))
case (Some(h), None) => Seq(HostTaskLocation(h))
case _ => Nil
}
}
// 資源調整事件處理
case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
handleExecutorLost(execId)
# 啟用DEBUG日志
log4j.logger.org.apache.spark.scheduler.DAGScheduler=DEBUG
// 通過SparkListener跟蹤
sc.addSparkListener(new SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
println(s"Stage ${stageSubmitted.stageInfo.stageId} submitted")
}
})
可能原因: - 持久化策略不當 - Shuffle依賴配置錯誤
診斷方法: - 分析Stage任務執行時間分布 - 檢查Shuffle Read/Write指標
通過對DAGScheduler源碼的系統解讀,我們可以深入理解: 1. RDD物理執行計劃的生成邏輯 2. Spark作業的調度生命周期 3. 分布式計算中的容錯處理機制 4. 性能優化的核心切入點
建議讀者結合Spark UI和實際調試經驗,逐步構建完整的調度系統認知模型。
參數 | 默認值 | 說明 |
---|---|---|
spark.default.parallelism | 8 | 默認并行度 |
spark.stage.maxConsecutiveAttempts | 4 | Stage最大重試次數 |
spark.locality.wait | 3s | 本地性等待時間 |
”`
注:本文基于Spark 3.3.x版本源碼分析,實際閱讀時請對應具體版本。建議通過IDE的代碼導航功能輔助閱讀,重點關注類繼承關系和關鍵方法調用鏈。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。