今天就跟大家聊聊有關Spark2.x中如何進行BlockManagerMaster源碼剖析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
1.BlockManagerMaster創建
BlockManagerMaster要負責整個應用程序在運行期間block元數據的管理和維護,以及向從節點發送指令執行命令,它是在構造SparkEnv的時候創建的,Driver端是創建SparkContext的時候創建SparkEnv,SparkEnv中對應的初始化代碼如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)
這里可以看到在構造blockManagerMaster時,會創建一個BlockManagerMasterEndpoint實例并注冊到了rpcEnv中,Executor中的blockManager通過Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef與blockManagerMaster進行通信。
2.BlockManagerMaster成員函數:
1).removeExecutor()函數,代碼如下:
//向BlockManagerMasterEndpoint發送RemoveExecutor消息,移除掛掉的Exeutor //這個函數只會在driver端調用 def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") }
2).removeExecutorAsync()函數,代碼如下:
// 跟1)作用差不多,移除掛掉的Executor,這里是非阻塞的異步方法 def removeExecutorAsync(execId: String) { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removal of executor " + execId + " requested") }
3).registerBlockManager()函數,代碼如下:
//Executor端的BlockManager啟動會,會向BlockManagerMaster進行注冊// BlockManagerMaster會保存在master的blockManagerInfo中 def registerBlockManager( blockManagerId: BlockManagerId, maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId }
3).updateBlockInfo()函數,代碼如下:
//更新block數據塊信息 def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { //向BlockManagerMasterEndpoint發送UpdateBlockInfo消息,并且返回結果 val res = driverEndpoint.askSync[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res }
4).getLocations()函數,代碼如下:
//獲取block所在的BockManager節點信息,這里返回的是Seq集合,
//如果block的Replication>1 一個block塊,可能會在多個blockmanager
//節點上存在
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
//向BlockManagerMasterEndpoint發送GetLocations消息
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}
5).getPeers()函數,代碼如下:
//獲取參數blockManagerId之外的其他BlockManagerId, //上面說了一個block,可能會在多個blockmanager節點上存在 def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { //向BlockManagerMasterEndpoint發送GetPeers消息 driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
6).getExecutorEndpointRef()函數,代碼如下:
//這里就是獲取BlockManagerMasterEndpoint的引用,與其進行通信 private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); info <- blockManagerInfo.get(blockManagerId) ) yield { info.slaveEndpoint } }
7).getBlockStatus()函數,代碼如下:
//獲取一個Block的狀態信息,位置,占用內存和磁盤大小def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap }
BlockManagerMaster里面的各種函數處理其實都在 BlockManagerMasterEndpoint實例中,后面我們會詳細剖析BlockManagerMasterEndpoint類的各個消息的具體處理流程。
看完上述內容,你們對Spark2.x中如何進行BlockManagerMaster源碼剖析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。