溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java rocketmq中與消息發送緊密相關的幾行代碼以及用法

發布時間:2021-10-13 11:24:27 來源:億速云 閱讀:200 作者:柒染 欄目:編程語言

這期內容當中小編將會給大家帶來有關java rocketmq中與消息發送緊密相關的幾行代碼以及用法,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

前言

與消息發送緊密相關的幾行代碼:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那這幾行代碼執行時,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Overridepublic void start() throws MQClientException {this.defaultMQProducerImpl.start();}

調用了默認生成消息的實現類 -- DefaultMQProducerImpl

調用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會初始化得到MQClientInstance實例對象,MQClientInstance實例對象調用它自己的start方法會 ,啟動一些服務,如拉去消息服務PullMessageService.Start()、啟動負載平衡服務RebalanceService.Start(),比如網絡通信服務MQClientAPIImpl.Start()

另外,還會執行與生產消息相關的信息,如注冊produceGroup、new一個TopicPublishInfo對象并以默認TopicKey為鍵值,構成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,獲取的MQClientInstance實例對象會調用sendHeartbeatToAllBroker()方法,不斷向broker發送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

上圖中的三個部分中涉及的內容:

1.1 初始化MQClientInstance

一個客戶端只能產生一個MQClientInstance實例對象,產生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動一些服務,源碼如下:

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}

1.2 注冊producer

該過程會將這個當前producer對象注冊到MQClientInstance實例對象的的producerTable中。一個jvm(一個客戶端)中一個producerGroup只能有一個實例,MQClientInstance操作producerTable大概有如下幾個方法:

-- selectProducer  -- updateTopicRouteInfoFromNameServer  -- prepareHeartbeatData  -- isNeedUpdateTopicRouteInfo  -- shutdown

注:

根據不同的clientId,MQClientManager將給出不同的MQClientInstance;

根據不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定義:

public class DefaultMQProducerImpl implements MQProducerInner {private final Logger log = ClientLogger.getLog();private final Random random = new Random();private final DefaultMQProducer defaultMQProducer;private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一個以topic為key的Map型數據結構,DefaultMQProducerImpl.start()時會默認創建一個key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 發送心跳包

MQClientInstance向broker發送心跳包時,調用sendHeartbeatToAllBroker( ),以及從MQClientInstance實例對象的brokerAddrTable中拿到所有broker地址,向這些broker發送心跳包。

sendHeartbeatToAllBroker會涉及到prepareHeartbeatData()方法,該方法會生成heartbeatData數據,發送心跳包時,heartbeatData作為心跳包的body。與producer相關的部分代碼如下:

// Producerfor (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {ProducerData producerData = new ProducerData();producerData.setGroupName(entry.getKey());heartbeatData.getProducerDataSet().add(producerData);}

二、. SendResult sendResult = producer.send(msg)

首先會調用DefaultMQProducer.send(msg) ,繼而調用sendDefaultImpl:

public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}

sendDefaultImpl做了啥?

2.1. 獲取topicPublishInfo

根據msg的topic從topicPublishInfoTable獲取對應的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 選擇消息發送的隊列

普通消息:默認方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息,默認采用長輪詢的方式選擇隊列 。

它的機制如下:正常情況下,順序選擇queue進行發送;如果某一個節點發生了超時,則下次選擇queue時,跳過相同的broker。不同的隊列選擇策略形成了生產消息的幾種模式,如順序消息,事務消息。

順序消息:將一組需要有序消費的消息發往同一個broker的同一個隊列上即可實現順序消息,假設相同訂單號的支付,退款需要放到同一個隊列,那么就可以在send的時候,自己實現MessageQueueSelector,根據參數arg字段來選擇queue。

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事務消息:只有在消息發送成功,并且本地操作執行成功時,才發送提交事務消息,做事務提交,消息發送失敗,直接發送回滾消息,進行回滾,具體如何實現后面會單獨成文分析。

2.3 封裝消息體通信包,發送數據包

首先,根據獲取的MessageQueue中的getBrokerName,調用findBrokerAddressInPublish得到該消息存放對應的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知獲取的broker均為master(id=0)

然后, 將與該消息相關信息打包成RemotingCommand數據包,其RequestCode.SEND_MESSAGE

根據獲取的broke地址,將數據包到對應的broker,默認是發送超時時間為3s。

封裝消息請求包的包頭:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);

發送消息包(普通消息默認為同步方式):

SendResult sendResult = null;switch (communicationMode) {   case SYNC:  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  brokerAddr,  mq.getBrokerName(),   msg,  requestHeader,   timeout,  communicationMode,  context,  this);break;

處理來自broker端的響應數據包:

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response);}

broker端處理request數據包后會將消息存儲到commitLog.

上述就是小編為大家分享的java rocketmq中與消息發送緊密相關的幾行代碼以及用法了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女