溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Giraph源碼分析(三)—— 消息通信

發布時間:2020-07-19 16:24:07 來源:網絡 閱讀:296 作者:數瀾 欄目:大數據

由前文知道每個BSPServiceWorker有一個WorkerServer對象,WorkerServer對象里面又有ServerData對象,作為數據實。ServerData中包含該Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore對象為MessageStoreByPartition(接口)類型,也就是說消息時按照分區來存儲的。MessageStoreByPartition接口的關系圖如下:

Giraph源碼分析(三)—— 消息通信cdn.xitu.io/2019/7/25/16c27c8bc6000f2d?w=640&h=241&f=png&s=196139">

在SimpleMessageStore抽象類中,有一個ConcurrentMap<Integer,ConcurrentMap<I,T>>類型的變量map,用來存儲消息。第一層是pairtitionID到發送到該partition消息的映射;第二層是VertexID 到發送給該Vertex的消息隊列。

Giraph源碼分析(三)—— 消息通信

《Giraph通信模塊分析》:http://my.oschina.net/skyaugust/blog/95182

每個頂點的消息列表具體為ExtendedDataOutput類型,它繼承DataOutput接口,增加了幾個方法而已。每個消息是以字節形式寫入到ExtendedDataOutput對象中的。

發送消息時,采用異步式通信。

圖頂點的計算處理與消息通信并發執行,在計算過程中就可以發送消息,將大規模消息發送分散在不同的時間段,避免瞬時網絡通信阻塞,但是接受端需要額外的空間,存儲臨時接收到的消息,相當于空間換時間。而集中式通信,圖頂點的計算處理與消息通信串行進行,在計算完畢后,統一發送消息,控制和實現方式簡單,可在發送端對消息進行最大程度優化,但容易造成瞬時間的網絡通信阻塞以及增加發送端的消息存儲開銷。

不同Worker間的消息通信使用RPC方式,具體為Netty。同一Worker內,連續兩次迭代的消息直接通過內存操作,把要發送的消息直接復制到Worker的incomingMessageStore中。下面詳述消息的存儲格式和發送機制。

Giraph使用Cache來緩存消息,當消息達到一定閾值后,一次性發送。

既按照bulk模式進行,不會一條一條信息發送。向某個頂點發送的消息是按照<destVertexId,Message> pair存儲在ByteArrayVertexIdData<I,T>中(實際為ByteArrayVertexIdMessages<I,M>類型)。介紹如下: org.apache.giraph.utils.ByteArrayVertexIdData<I,T>

功能:把<頂點ID,data> Pair 存儲在一個 byte數組中。里面有 ExtendedDataOutput對象用來存儲數據。

Giraph源碼分析(三)—— 消息通信
該類中還有一個內部類:VertexIdDataIterator,該內部類繼承 VertexIdIterator類。

Giraph源碼分析(三)—— 消息通信

org.apache.giraph.comm.SendCache用來緩存發送的信息,然后以“Bulk”模式發送。在Giraph中,每個Worker上可以對應多個分區。消息緩存的閾值是以Worker為單位計算,而不是Partition。

Giraph源碼分析(三)—— 消息通信

SendCache中有ByteArrayVertexIdData<I,T>[ ] dataCache數組用來存儲發送給每個Partition的消息;有int[ ] dataSizes數組用于記錄向每個Worker發送的消息大小,若大于MAX_MSG_REQUEST_SIZE(默認為512KB)就把此Worker上的所有Partition緩存的消息發送到給該Worker,同一Worker內消息也是如此緩存;有int[ ] initBufferSizes數組用于記錄每個Worker上的每個Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput對象的大小,同一Worker上的所有Partition初始值相同,該值為平均值。記MAX_MSG_REQUEST_SIZE(message request size)值為M, 該Worker上有P個 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默認為0.2f,記為A。則每個Partition的初始大小為:M*(1+A) / P .

由前文知道,每個Worker都有一個NettyWorkerClientRequestProcessor<I,V,E,M>用來發送消息。該類中有SendMessageCache對象用來緩存向外發送的信息。NettyWorkerClientRequestProcessor類中的sendMessageRequest(I,M)

方法如下,用于向某個頂點destVertexId發送消息message。

Giraph源碼分析(三)—— 消息通信

方法解釋:首先根據destVertexId得到對應的partitionId和WorkerInfo,然后把消息add到SendMessageCache中,并返回向該頂點所屬Worker發送的消息大小workerMessageSize。若該值大于默認值512KB,則把此Worker對應的所有Partition消息從SendMessageCache中刪除,把刪除的消息賦值給workerMessages,其類型為PairList<Integer,ByteArrayVertexIdMessages<I,M>> ,key為partitionId,value為發送給該partition的消息列表,最后調用doRequest()方法發送信息。doRequest()方法如下:

Giraph源碼分析(三)—— 消息通信

可以看到在發送消息時,先判斷是否在同一Worker上。如果是的話,調用SendWorkerMessagesRequest<T,M>的doRequest發送消息;否則使用WorkerClient(底層使用Netty)進行消息發送。下面著重討論同一Worker內的機制。

org.apache.giraph.comm.requests.SendWorkerMessagesRequest類中的doRequest方法如下:

Giraph源碼分析(三)—— 消息通信

參數為該Worker的ServerData,代碼中的partitionVertexData實際為PairList<Integer,ByteArrayVertexIdMessages<I,M>>workerMessages。遍歷<partitionID,對應的消息列表>來添加到ServerData中的incomingMessageStore中。

ByteArrayMessagesPerVertexStore類中的addPartitionMessages()方法如下:

Giraph源碼分析(三)—— 消息通信

當用戶使用了Combiner,incomingMessageStore對應的類型則為OneMessagePerVertexStore,該類為每個頂點只存儲一個消息,而非消息隊列。 結構如下圖:

Giraph源碼分析(三)—— 消息通信

當添加一條消息時,會把頂點已對應的消息和要添加的消息調用combine()方法進行合并,然后存儲在上述結構圖中。addPartitionMessages()方法如下:

Giraph源碼分析(三)—— 消息通信

在ComputeCallable中的call()方法調用computePartition(Partition)計算完所有Partition上的頂點后,調用WorkerClientRequestProcessor.flush()方法把所有剩余的消息發送出去。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女