這篇文章主要講解了“bytom節點怎么接收對方發過來的信息”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“bytom節點怎么接收對方發過來的信息”吧!
如果我們在代碼中搜索BlockRequestMessage,會發現只有在ProtocolReactor.Receive方法中針對該信息進行了應答。那么問題的關鍵就是,比原是如何接收對方發過來的信息,并且把它轉交給ProtocolReactor.Receive的。
比原在發送信息時,最后會把信息寫入到MConnection.bufWriter中;與之相應的,MConnection還有一個bufReader,用于讀取數據,它也是與net.Conn綁定在一起的:
p2p/connection.go#L114-L118
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
mconn := &MConnection{
conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),(其中minReadBufferSize的值為常量1024)
所以,要讀取對方發來的信息,一定會讀取bufReader。經過簡單的搜索,我們發現,它也是在MConnection.Start中啟動的:
p2p/connection.go#L152-L159
func (c *MConnection) OnStart() error {
// ...
go c.sendRoutine()
go c.recvRoutine()
// ...
}其中的c.recvRoutine()就是我們本次所關注的。它上面的c.sendRoutine是用來發送的,是前一篇文章中我們關注的重點。
繼續c.recvRoutine():
p2p/connection.go#L403-L502
func (c *MConnection) recvRoutine() {
// ...
for {
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
// ...
pktType := wire.ReadByte(c.bufReader, &n, &err)
c.recvMonitor.Update(int(n))
// ...
switch pktType {
// ...
case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
c.recvMonitor.Update(int(n))
// ...
channel, ok := c.channelsIdx[pkt.ChannelID]
// ...
msgBytes, err := channel.recvMsgPacket(pkt)
// ...
if msgBytes != nil {
// ...
c.onReceive(pkt.ChannelID, msgBytes)
}
// ...
}
}
// ...
}經過簡化以后,這個方法分成了三塊內容:
第一塊就限制接收速率,以防止惡意結點突然發送大量數據把節點撐死。跟發送一樣,它的限制是500K/s
第二塊是從c.bufReader中讀取出下一個數據包的類型。它的值目前有三個,兩個跟心跳有關:packetTypePing和packetTypePong,另一個表示是正常的信息數據類型packetTypeMsg,也是我們需要關注的
第三塊就是繼續從c.bufReader中讀取出完整的數據包,然后根據它的ChannelID找到相應的channel去處理它。ChannelID有兩個值,分別是BlockchainChannel和PexChannel,我們目前只需要關注前者即可,它對應的reactor是ProtocolReactor。當最后調用c.onReceive(pkt.ChannelID, msgBytes)時,讀取的二進制數據msgBytes就會被ProtocolReactor.Receive處理
我們的重點是看第三塊內容。首先是channel.recvMsgPacket(pkt),即通道是怎么從packet包里讀取到相應的二進制數據的呢?
p2p/connection.go#L667-L682
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
// ...
ch.recving = append(ch.recving, packet.Bytes...)
if packet.EOF == byte(0x01) {
msgBytes := ch.recving
// ...
ch.recving = ch.recving[:0]
return msgBytes, nil
}
return nil, nil
}這個方法我去掉了一些錯誤檢查和關于性能方面的注釋,有興趣的同學可以點接上方的源代碼查看,這里就忽略了。
這段代碼主要是利用了一個叫recving的通道,把packet中持有的字節數組加到它后面,然后再判斷該packet是否代表整個信息結束了,如果是的話,則把ch.recving的內容完整返回,供調用者處理;否則的話,返回一個nil,表示還沒拿完,暫時處理不了。在前一篇文章中關于發送數據的地方可以與這里對應,只不過發送方要麻煩的多,需要三個通道sendQueue、sending和send才能實現,這邊接收方就簡單了。
然后回到前面的方法MConnection.recvRoutine,我們繼續看最后的c.onReceive調用。這個onReceive實際上是一個由別人賦值給該channel的一個函數,它位于MConnection創建的地方:
p2p/peer.go#L292-L310
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
if chID == PexChannel {
return
} else {
cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
}
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}邏輯也比較簡單,就是當前面的c.onReceive(pkt.ChannelID, msgBytes)調用時,它會根據傳入的chID找到相應的Reactor,然后執行其Receive方法。對于本文來說,就會進入到ProtocolReactor.Receive。
那我們繼續看ProtocolReactor.Receive:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
// ...
switch msg := msg.(type) {
case *BlockRequestMessage:
// ...
}其中的DecodeMessage(...)就是把傳入的二進制數據反序列化成一個BlockchainMessage對象,該對象是一個沒有任何內容的interface,它有多種實現類型。我們在后面繼續對該對象進行判斷,如果它是BlockRequestMessage類型的信息,我們就會繼續做相應的處理。處理的代碼我在這里暫時省略了,因為它是屬于下一個小問題的,我們先不考慮。
好像不知不覺我們就把第一個小問題的后半部分差不多搞清楚了。那么前半部分是什么?我們在前面說,讀取bufReader的代碼的起點是在MConnection.Start中,那么前半部分就是:比原從啟動開始中,是在什么情況下怎樣一步步走到MConnection.Start的呢?
好在前半部分的問題我們在前一篇文章《比原是如何把請求區塊數據的信息發出去的》中進行了專門的討論,這里就不講了,有需要的話可以再過去看一下(可以先看最后“總結”那一小節)。
下面我們進入第二個小問題:
BlockRequestMessage后,將會給對方發送什么樣的信息?這里就是接著前面的ProtocolReactor.Receive繼續向下講了。首先我們再貼一下它的較完整的代碼:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
// ...
switch msg := msg.(type) {
case *BlockRequestMessage:
var block *types.Block
var err error
if msg.Height != 0 {
block, err = pr.chain.GetBlockByHeight(msg.Height)
} else {
block, err = pr.chain.GetBlockByHash(msg.GetHash())
}
// ...
response, err := NewBlockResponseMessage(block)
// ...
src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
// ...
}可以看到,邏輯還是比較簡單的,即根據對方發過來的BlockRequestMessage中指定的height或者hash信息,在本地的區塊鏈數據中找到相應的block,組成BlockResponseMessage發過去就行了。
其中chain.GetBlockByHeight(...)和chain.GetBlockByHash(...)如果詳細說明的話,需要深刻理解區塊鏈數據在比原節點中是如何保存的,我們在本文先不講,等到后面專門研究。
在這里,我覺得我們只需要知道我們會查詢區塊數據并且構造出一個BlockResponseMessage,再通過BlockchainChannel這個通道發送出去就可以了。
最后一句代碼中調用了src.TrySend方法,它是把信息向對方peer發送過去。(其中的src就是指的對方peer)
那么,它到底是怎么發送出去的呢?下面我們進入最后一個小問題:
BlockResponseMessage信息是如何發送出去的?我們先看看peer.TrySend代碼:
p2p/peer.go#L242-L247
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.TrySend(chID, msg)
}它在內部將會調用MConnection.TrySend方法,其中chID是BlockchainChannel,也就是它對應的Reactor是ProtocolReactor。
感謝各位的閱讀,以上就是“bytom節點怎么接收對方發過來的信息”的內容了,經過本文的學習后,相信大家對bytom節點怎么接收對方發過來的信息這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。