溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark中的DRA怎么開啟

發布時間:2022-01-07 14:47:12 來源:億速云 閱讀:206 作者:iii 欄目:大數據

這篇文章主要講解了“spark中的DRA怎么開啟”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“spark中的DRA怎么開啟”吧!

spark on yarn 中的DynamicResourceAllocation

spark on yarn對于DynamicResourceAllocation分配來說,從spark 1.2版本就已經開始支持了.
對于spark熟悉的人都知道,如果我們要開啟DynamicResourceAllocation,就得有ExternalShuffleService服務,
對于yarn來說ExternalShuffleService是作為輔助服務開啟的,具體配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>

<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>

<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重啟nodeManager,這樣在每個nodeManager節點就會啟動一個YarnShuffleService,之后在spark應用中設置spark.dynamicAllocation.enabled 為true,這樣就能達到運行時資源動態分配的效果

我們直接從CoarseGrainedExecutorBackend中SparkEnv創建開始說,每一個executor的啟動,必然會經過CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的創建

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的創建就涉及到BlockManager的創建。沿著代碼往下走,最終

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在blockManager的initialize方法中,就會進行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
     registerWithExternalShuffleServer()
   }

如果我們開啟了ExternalShuffleService,對于yarn就是YarnShuffleService,就會把當前的ExecutorShuffleInfo注冊到host為shuffleServerId.host, port為shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

這里我重點分析一下registerWithExternalShuffleServer的方法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)

該代碼中shuffleServerId來自于:

shuffleServerId = if (externalShuffleServiceEnabled) {
     logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {
     blockManagerId
   }

而blockTransferService.hostName 是我們在SparkEnv中創建的時候由advertiseAddress傳過來的,
最終由CoarseGrainedExecutorBackend 主類參數hostname過來的,那到底怎么傳過來的呢? 參照ExecutorRunnable的prepareCommand方法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而這個hostname的值最終由YarnAllocator的方法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

傳遞過來的,也就是說我們最終獲取到了yarn節點,也就是nodeManager的host 這樣每個啟動的executor,就向executor所在的nodeManager的YarnShuffleService注冊了ExecutorShuffleInfo信息,這樣對于開啟了動態資源分配的
ExternalBlockStoreClient 來說fetchBlocksg過程就和未開啟動態資源分配的NettyBlockTransferService大同小異了

spark on k8s(kubernetes) 中的DynamicResourceAllocation

參考之前的文章,我們知道在entrypoint中我們在啟動executor的時候,我們傳遞了hostname參數

executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而SPARK_EXECUTOR_POD_IP是運行中的POD IP,參考BasicExecutorFeatureStep類片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

這樣按照以上流程的分析,
executor也不能向k8s節點ExternalShuffleService服務注冊,因為我們注冊的節點是POD IP,而不是節點IP,
當然spark社區早就提出了未開啟external shuffle service的動態資源分配,且已經合并到master分支. 具體配置,可以參照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout	 60s

感謝各位的閱讀,以上就是“spark中的DRA怎么開啟”的內容了,經過本文的學習后,相信大家對spark中的DRA怎么開啟這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女