溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行JobScheduler內幕實現和深度思考

發布時間:2021-11-24 16:03:50 來源:億速云 閱讀:127 作者:柒染 欄目:云計算

本篇文章為大家展示了如何進行JobScheduler內幕實現和深度思考,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

DStream的foreachRDD方法,實例化ForEachDStream對象,并將用戶定義的函數foreachFunc傳入到該對象中。foreachRDD方法是輸出操作,foreachFunc方法會作用到這個DStream中的每個RDD。

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * 'this' DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           in the `foreachFuncto be displayed in the UI. If `false`, then
 *                           only the scopes and callsites of `foreachRDDwill override those
 *                           of the RDDs on the display.
 */
private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

ForEachDStream對象中重寫了generateJob方法,調用父DStream的getOrCompute方法來生成RDD并封裝Job,傳入對該RDD的操作函數foreachFunc和time。dependencies方法定義為父DStream的集合。

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           by `foreachFuncwill be displayed in the UI; only the scope and
 *                           callsite of `DStream.foreachRDDwill be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

DStreamGraph的generateJobs方法中會調用outputStream的generateJob方法,就是調用ForEachDStream的generateJob方法。

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

DStream的generateJob定義如下,其子類中只有ForEachDStream重寫了generateJob方法。

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

DStream的print方法內部還是調用foreachRDD來實現,傳入了內部方法foreachFunc,來取出num+1個數后打印輸出。

/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println("Time: " + time)
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

總結:JobScheduler是SparkStreaming 所有Job調度的中心,內部有兩個重要的成員:

JobGenerator負責Job的生成,ReceiverTracker負責記錄輸入的數據源信息。

JobScheduler的啟動會導致ReceiverTracker和JobGenerator的啟動。ReceiverTracker的啟動導致運行在Executor端的Receiver啟動并且接收數據,ReceiverTracker會記錄Receiver接收到的數據meta信息。JobGenerator的啟動導致每隔BatchDuration,就調用DStreamGraph生成RDD Graph,并生成Job。JobScheduler中的線程池來提交封裝的JobSet對象(時間值,Job,數據源的meta)。Job中封裝了業務邏輯,導致最后一個RDD的action被觸發,被DAGScheduler真正調度在Spark集群上執行該Job。

上述內容就是如何進行JobScheduler內幕實現和深度思考,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女