這篇文章主要講解了“spark中的DRA怎么開啟”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“spark中的DRA怎么開啟”吧!
spark on yarn對于DynamicResourceAllocation分配來說,從spark 1.2版本就已經開始支持了.
對于spark熟悉的人都知道,如果我們要開啟DynamicResourceAllocation,就得有ExternalShuffleService服務,
對于yarn來說ExternalShuffleService是作為輔助服務開啟的,具體配置如下:
<property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>
重啟nodeManager,這樣在每個nodeManager節點就會啟動一個YarnShuffleService,之后在spark應用中設置spark.dynamicAllocation.enabled 為true,這樣就能達到運行時資源動態分配的效果
我們直接從CoarseGrainedExecutorBackend中SparkEnv創建開始說,每一個executor的啟動,必然會經過CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的創建
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
而sparkEnv的創建就涉及到BlockManager的創建。沿著代碼往下走,最終
val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) val blockManager = new BlockManager( executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)
在blockManager的initialize方法中,就會進行registerWithExternalShuffleServer
// Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() }
如果我們開啟了ExternalShuffleService,對于yarn就是YarnShuffleService,就會把當前的ExecutorShuffleInfo注冊到host為shuffleServerId.host, port為shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:
val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName)
這里我重點分析一下registerWithExternalShuffleServer的方法中的以下片段
// Synchronous and will throw an exception if we cannot connect. blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
該代碼中shuffleServerId來自于:
shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId }
而blockTransferService.hostName 是我們在SparkEnv中創建的時候由advertiseAddress傳過來的,
最終由CoarseGrainedExecutorBackend 主類參數hostname過來的,那到底怎么傳過來的呢? 參照ExecutorRunnable的prepareCommand方法,
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++
而這個hostname的值最終由YarnAllocator的方法runAllocatedContainers
val executorHostname = container.getNodeId.getHost
傳遞過來的,也就是說我們最終獲取到了yarn節點,也就是nodeManager的host 這樣每個啟動的executor,就向executor所在的nodeManager的YarnShuffleService注冊了ExecutorShuffleInfo信息,這樣對于開啟了動態資源分配的
ExternalBlockStoreClient 來說fetchBlocksg過程就和未開啟動態資源分配的NettyBlockTransferService大同小異了
參考之前的文章,我們知道在entrypoint中我們在啟動executor的時候,我們傳遞了hostname參數
executor) shift 1 CMD=( ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP )
而SPARK_EXECUTOR_POD_IP是運行中的POD IP,參考BasicExecutorFeatureStep類片段:
Seq(new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) .withValueFrom(new EnvVarSourceBuilder() .withNewFieldRef("v1", "status.podIP") .build()) .build())
這樣按照以上流程的分析,
executor也不能向k8s節點ExternalShuffleService服務注冊,因為我們注冊的節點是POD IP,而不是節點IP,
當然spark社區早就提出了未開啟external shuffle service的動態資源分配,且已經合并到master分支. 具體配置,可以參照如下:
spark.dynamicAllocation.enabled true spark.dynamicAllocation.shuffleTracking.enabled true spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.maxExecutors 4 spark.dynamicAllocation.executorIdleTimeout 60s
感謝各位的閱讀,以上就是“spark中的DRA怎么開啟”的內容了,經過本文的學習后,相信大家對spark中的DRA怎么開啟這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。