溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何理解RocketMQ存儲中的主從同步

發布時間:2021-11-17 17:15:56 來源:億速云 閱讀:193 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關如何理解RocketMQ存儲中的主從同步,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

一、問題思考

1.消息存儲在Master上了,如何同步到Slave上了呢?
2.同步復制和異步復制流程是怎么樣的?

二、Broker啟動HA調用鏈

1.HA初始化調用鏈

@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore
@4 DefaultMessageStore#DefaultMessageStore()
this.haService = new HAService(this);
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig()
.getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();

2.啟動調用鏈

@1 BrokerStartup#start
controller.start();
@2 BrokerController#start
this.messageStore.start();
@3 DefaultMessageStore#start
@4 this.haService.start();
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();

小結:從初始化和啟動調用鏈中可以看到,在Broker啟動時,初始化并啟動了三個線程類,分別為AcceptSocketService, GroupTransferService, HAClient。

問題:這三個線程類在干啥?


三、線程類職責
1.AcceptSocketService職責

如何理解RocketMQ存儲中的主從同步

小結:AcceptSocketService職責初始化TCP通道,監聽新的連接并創建HAConnection。

問題:HAConnection在做什么?


2.HAConnection職責

//構造方法
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
//獲取客戶端請求地址
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
//將通道調整為非阻塞
this.socketChannel.configureBlocking(false);
//關閉連接前將數據發送完畢
this.socketChannel.socket().setSoLinger(false, -1);
//將Nagle算法關閉,客戶端每發送一次數據無論大小,都會將其發送出去
this.socketChannel.socket().setTcpNoDelay(true);
//設置接受緩存區為64K
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
//設置發包緩存區為64K
this.socketChannel.socket().setSendBufferSize(1024 * 64);
//寫數據線程類
this.writeSocketService = new WriteSocketService(this.socketChannel);
//讀數據線程類
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
//啟動
public void start() {
//啟動讀數據線程
this.readSocketService.start();
//啟動寫數據線程
this.writeSocketService.start();
}

疑問:HAConnection除了對通道做了一些設置外,啟動了兩個線程服務類,分別為readSocketService和writeSocketService,他們職責是什么呢?

2.1 writeSocketService職責
流程圖

如何理解RocketMQ存儲中的主從同步

小結:writeSocketService主要職責,將數據不斷寫入socketChannel通道;寫入數據的大小為nextTransferFromWhere與最大可讀位置getReadPosition之間數據;每次寫完傳輸指針自增this.nextTransferFromWhere += size;每隔5秒發送心跳包到socketChannel通道。

2.2 readSocketService職責

流程圖

如何理解RocketMQ存儲中的主從同步

小結:readSocketService主要職責解析slave發來的請求位點,并更新push3SlaveMaxOffset為該請求位點;喚醒groupTransferService線程。

3.GroupTransferService職責

如何理解RocketMQ存儲中的主從同步

小結:GroupTransferService職責判斷主從同步是否完成,完成后喚醒消息發送線程。

4.HAClient職責

如何理解RocketMQ存儲中的主從同步

小結:HAClient職責Slave封裝實現類,負責與Master建立連接通道,并從通道中獲取數據存儲;并向Master上報Slave存儲的最大物理偏移量。

五、主從同步示意圖

1.主從同步交互消息格式
1.1 Slave上報物理偏移量reportOffset量格式

00000018516677754880|長度為8位的20位數字

1.2 Master寫入Slave的信息由Header與Body構成

00000018516677754880+size|Header部分由8位物理偏移量+消息體大

消息Body具體內容|Slave請求的位點與Master可讀位置之間的數據

2.主從同步示意圖

如何理解RocketMQ存儲中的主從同步

以上就是如何理解RocketMQ存儲中的主從同步,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女