溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark2.x中如何用源碼剖析SortShuffleWriter具體實現

發布時間:2021-11-15 15:14:24 來源:億速云 閱讀:207 作者:柒染 欄目:大數據

Spark2.x中如何用源碼剖析SortShuffleWriter具體實現

1. 引言

在大數據處理領域,Apache Spark 是一個廣泛使用的分布式計算框架。Spark 的核心優勢之一是其高效的 Shuffle 機制,而 SortShuffleWriter 是 Spark 2.x 中用于實現 Shuffle 操作的關鍵組件之一。本文將深入剖析 SortShuffleWriter 的具體實現,通過源碼分析來揭示其內部工作原理。

2. Shuffle 機制概述

在 Spark 中,Shuffle 是指將數據重新分布到不同的分區中,以便進行后續的計算。Shuffle 操作通常發生在寬依賴(Wide Dependency)的情況下,例如 reduceByKey、groupByKey 等操作。Shuffle 的性能對整個 Spark 作業的執行效率有著重要影響。

2.1 Shuffle 的兩種實現方式

在 Spark 2.x 中,Shuffle 有兩種主要的實現方式:

  1. HashShuffleManager:基于哈希表的 Shuffle 實現,適用于小規模數據集。
  2. SortShuffleManager:基于排序的 Shuffle 實現,適用于大規模數據集。

本文主要關注 SortShuffleManager 中的 SortShuffleWriter。

3. SortShuffleWriter 概述

SortShuffleWriterSortShuffleManager 的核心組件之一,負責將數據寫入磁盤并進行排序。其主要任務包括:

  1. 數據分區:將數據按照分區 ID 進行分組。
  2. 數據排序:對每個分區內的數據進行排序。
  3. 數據寫入:將排序后的數據寫入磁盤。

3.1 SortShuffleWriter 的類結構

SortShuffleWriter 的類結構如下:

private[spark] class SortShuffleWriter[K, V, C](
    handle: BaseShuffleHandle[K, V, C],
    mapId: Int,
    context: TaskContext)
  extends ShuffleWriter[K, V] with Logging {
  // 類實現
}

其中,handleBaseShuffleHandle 的實例,包含了 Shuffle 的相關信息;mapId 是當前任務的 ID;context 是任務上下文。

4. SortShuffleWriter 的實現細節

4.1 數據分區

SortShuffleWriter 首先需要將數據按照分區 ID 進行分組。這一過程通過 Partitioner 實現,Partitioner 是一個抽象類,定義了如何將鍵值對分配到不同的分區中。

private val partitioner = handle.dependency.partitioner

partitionerShuffleDependency 中的一個屬性,表示當前 Shuffle 操作的分區器。

4.2 數據排序

在數據分區之后,SortShuffleWriter 會對每個分區內的數據進行排序。排序是通過 ExternalSorter 實現的,ExternalSorter 是 Spark 中用于外部排序的工具類。

private val sorter = new ExternalSorter[K, V, C](
  context, handle.dependency.aggregator, Some(handle.dependency.partitioner), keyOrdering, serializer)

ExternalSorter 的構造函數參數包括:

  • context:任務上下文。
  • aggregator:用于聚合操作的聚合器。
  • partitioner:分區器。
  • keyOrdering:鍵的排序規則。
  • serializer:序列化器。

4.3 數據寫入

排序完成后,SortShuffleWriter 會將數據寫入磁盤。數據寫入是通過 DiskBlockObjectWriter 實現的,DiskBlockObjectWriter 是 Spark 中用于將數據寫入磁盤的工具類。

private val writer = blockManager.getDiskWriter(
  blockId, outputFile, serializer, bufferSize, writeMetrics)

DiskBlockObjectWriter 的構造函數參數包括:

  • blockId:塊的 ID。
  • outputFile:輸出文件。
  • serializer:序列化器。
  • bufferSize:緩沖區大小。
  • writeMetrics:寫入度量。

4.4 數據合并

在數據寫入磁盤后,SortShuffleWriter 會將多個小文件合并成一個大文件。這一過程通過 IndexShuffleBlockResolver 實現,IndexShuffleBlockResolver 是 Spark 中用于管理 Shuffle 塊的工具類。

private val shuffleBlockResolver = shuffleManager.shuffleBlockResolver

IndexShuffleBlockResolver 的主要任務是管理 Shuffle 塊的索引文件和數據文件。

5. SortShuffleWriter 的源碼分析

5.1 write 方法

SortShuffleWriter 的核心方法是 write,該方法負責將數據寫入磁盤并進行排序。write 方法的源碼如下:

override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter.insertAll(records)
  val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, outputFile)
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}

write 方法的主要步驟如下:

  1. 插入數據:通過 sorter.insertAll 方法將數據插入到 ExternalSorter 中。
  2. 獲取輸出文件:通過 shuffleBlockResolver.getDataFile 方法獲取輸出文件。
  3. 寫入分區數據:通過 sorter.writePartitionedFile 方法將分區數據寫入磁盤。
  4. 寫入索引文件:通過 shuffleBlockResolver.writeIndexFileAndCommit 方法寫入索引文件并提交。
  5. 更新 MapStatus:更新 MapStatus,表示當前任務的狀態。

5.2 insertAll 方法

insertAll 方法是 ExternalSorter 的核心方法之一,負責將數據插入到 ExternalSorter 中。insertAll 方法的源碼如下:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  val shouldCombine = aggregator.isDefined
  if (shouldCombine) {
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (records.hasNext) {
      kv = records.next()
      map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  } else {
    while (records.hasNext) {
      val kv = records.next()
      buffer.insert((getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]))
      maybeSpillCollection(usingMap = false)
    }
  }
}

insertAll 方法的主要步驟如下:

  1. 判斷是否需要聚合:通過 aggregator.isDefined 判斷是否需要聚合操作。
  2. 插入數據:如果需要聚合,則通過 map.changeValue 方法插入數據;否則,通過 buffer.insert 方法插入數據。
  3. 可能溢出:通過 maybeSpillCollection 方法判斷是否需要將數據溢出到磁盤。

5.3 writePartitionedFile 方法

writePartitionedFile 方法是 ExternalSorter 的另一個核心方法,負責將分區數據寫入磁盤。writePartitionedFile 方法的源碼如下:

def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {
  val lengths = new Array[Long](numPartitions)
  val writer = blockManager.getDiskWriter(blockId, outputFile, serializer, bufferSize, writeMetrics)
  val partitionedIterator = partitionedDestructiveSortedIterator(None)
  partitionedIterator.foreach { case (partitionId, elements) =>
    val length = writer.write(elements)
    lengths(partitionId) = length
  }
  writer.commitAndClose()
  lengths
}

writePartitionedFile 方法的主要步驟如下:

  1. 初始化長度數組:初始化一個長度為 numPartitions 的數組,用于存儲每個分區的數據長度。
  2. 獲取磁盤寫入器:通過 blockManager.getDiskWriter 方法獲取磁盤寫入器。
  3. 獲取分區迭代器:通過 partitionedDestructiveSortedIterator 方法獲取分區迭代器。
  4. 寫入數據:遍歷分區迭代器,將每個分區的數據寫入磁盤,并記錄數據長度。
  5. 提交并關閉寫入器:通過 writer.commitAndClose 方法提交并關閉寫入器。

5.4 writeIndexFileAndCommit 方法

writeIndexFileAndCommit 方法是 IndexShuffleBlockResolver 的核心方法之一,負責寫入索引文件并提交。writeIndexFileAndCommit 方法的源碼如下:

def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  val indexFile = getIndexFile(shuffleId, mapId)
  val indexTmp = Utils.tempFileWith(indexFile)
  try {
    val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
    Utils.tryWithSafeFinally {
      var offset = 0L
      out.writeLong(offset)
      for (length <- lengths) {
        offset += length
        out.writeLong(offset)
      }
    } {
      out.close()
    }
    val dataFile = getDataFile(shuffleId, mapId)
    if (dataFile.exists()) {
      dataFile.delete()
    }
    if (!dataTmp.renameTo(dataFile)) {
      throw new IOException(s"fail to rename ${dataTmp} to ${dataFile}")
    }
    if (!indexTmp.renameTo(indexFile)) {
      throw new IOException(s"fail to rename ${indexTmp} to ${indexFile}")
    }
  } finally {
    if (indexTmp.exists() && !indexTmp.delete()) {
      logError(s"Failed to delete temporary index file at ${indexTmp}")
    }
  }
}

writeIndexFileAndCommit 方法的主要步驟如下:

  1. 獲取索引文件:通過 getIndexFile 方法獲取索引文件。
  2. 創建臨時索引文件:通過 Utils.tempFileWith 方法創建臨時索引文件。
  3. 寫入索引數據:將每個分區的偏移量寫入臨時索引文件。
  4. 重命名數據文件:將臨時數據文件重命名為正式數據文件。
  5. 重命名索引文件:將臨時索引文件重命名為正式索引文件。

6. SortShuffleWriter 的性能優化

SortShuffleWriter 的性能對整個 Spark 作業的執行效率有著重要影響。為了提高 SortShuffleWriter 的性能,Spark 采用了多種優化策略,包括:

  1. 內存管理:通過 ExternalSorter 管理內存,避免內存溢出。
  2. 數據壓縮:通過 Serializer 對數據進行壓縮,減少磁盤 I/O。
  3. 并行寫入:通過多個 DiskBlockObjectWriter 并行寫入數據,提高寫入速度。

6.1 內存管理

ExternalSorter 通過 maybeSpillCollection 方法管理內存,當內存使用超過一定閾值時,將數據溢出到磁盤。maybeSpillCollection 方法的源碼如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {
  if (usingMap) {
    if (map.estimateSize() >= _spillThreshold) {
      spill()
    }
  } else {
    if (buffer.size >= _spillThreshold) {
      spill()
    }
  }
}

maybeSpillCollection 方法的主要步驟如下:

  1. 判斷內存使用:如果使用 map,則通過 map.estimateSize 方法判斷內存使用;否則,通過 buffer.size 方法判斷內存使用。
  2. 溢出數據:如果內存使用超過閾值,則通過 spill 方法將數據溢出到磁盤。

6.2 數據壓縮

Serializer 是 Spark 中用于數據序列化和壓縮的工具類。Serializer 的默認實現是 JavaSerializer,但可以通過配置使用其他序列化器,例如 KryoSerializer。

private val serializer = SparkEnv.get.serializer

Serializer 的主要任務是將數據序列化為字節流,并對字節流進行壓縮,以減少磁盤 I/O。

6.3 并行寫入

DiskBlockObjectWriter 是 Spark 中用于將數據寫入磁盤的工具類。DiskBlockObjectWriter 支持并行寫入,可以通過多個 DiskBlockObjectWriter 同時寫入數據,以提高寫入速度。

private val writer = blockManager.getDiskWriter(
  blockId, outputFile, serializer, bufferSize, writeMetrics)

DiskBlockObjectWriter 的主要任務是將數據寫入磁盤,并記錄寫入度量。

7. 總結

SortShuffleWriter 是 Spark 2.x 中用于實現 Shuffle 操作的關鍵組件之一。通過源碼分析,我們深入了解了 SortShuffleWriter 的內部工作原理,包括數據分區、數據排序、數據寫入和數據合并等過程。此外,我們還探討了 SortShuffleWriter 的性能優化策略,包括內存管理、數據壓縮和并行寫入等。

通過對 SortShuffleWriter 的源碼剖析,我們可以更好地理解 Spark 的 Shuffle 機制,并為優化 Spark 作業的性能提供有價值的參考。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女