溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Linux下怎么部署分布式消息系統RocketMQ

發布時間:2022-01-28 19:19:29 來源:億速云 閱讀:251 作者:iii 欄目:開發技術

本篇內容主要講解“Linux下怎么部署分布式消息系統RocketMQ”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Linux下怎么部署分布式消息系統RocketMQ”吧!

Linux下怎么部署分布式消息系統RocketMQ

一、本篇所需文件下載

鏈接:https://pan.baidu.com/s/17iUB1lBOjv4CBAEQFvn65A 提取碼:v0sn

Linux下怎么部署分布式消息系統RocketMQ

一、Linux環境搭建

1、安裝 jdk環境

RocketMQ java編寫,需要jdk環境

下載jdk 1.7.0_80 上傳到linux ,必須64位,32位RocketMQ不支持

tar -zxvf  jdk-7u80-linux-x64.tar.gz        //解壓

修改環境變量 vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.7.0_80export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$JAVA_HOME/bin:$PATH

刷新配置

source /etc/profile

或jdk1.8下載安裝教程:https://blog.csdn.net/qq_41463655/article/details/99173682

2、安裝RocketMQ

2.1、上傳alibaba-rocketmq-3.2.6.tar.gz 上傳到linux解壓安裝

tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local                //解壓到 /usr/localmv /usr/local/alibaba-rocketmq /usr/local/alibaba-rocketmq-3.2.6      //重命名
ln -s /usr/local/alibaba-rocketmq-3.2.6 rocketmq                      //安裝

安裝好了

  Linux下怎么部署分布式消息系統RocketMQ

2.2、創建存儲路徑

cd  /usr/local/rocketmq
mkdir store
mkdir store/commitlog
mkdir store/consumequeue
mkdir store/index

2.3、日志配置

cd  /usr/local/rocketmq
mkdir logs        cd conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

2.4、配置 broker-a.properties / broker-b.properties /usr/local/rocketmq/conf/2m-noslave/ 目錄下

2.4.1、broker-a.properties

#所屬集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此處不同的配置文件填寫的不一樣brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分號分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數defaultTopicQueueNums=4#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉autoCreateTopicEnable=true#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉autoCreateSubscriptionGroup=true#Broker 對外服務的監聽端口listenPort=10911#刪除文件時間點,默認凌晨 4點deleteWhen=04#文件保留時間,默認 48 小時fileReservedTime=120#commitLog每個文件的大小默認1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個文件默認存30W條,根據業務情況調整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲路徑storePathRootDir=/usr/local/rocketmq/store#commitLog 存儲路徑storePathCommitLog=/usr/local/rocketmq/store/commitlog#消費隊列存儲路徑存儲路徑storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存儲路徑storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存儲路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 異步復制Master#- SYNC_MASTER 同步雙寫Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#發消息線程池數量#sendMessageThreadPoolNums=128#拉消息線程池數量#pullMessageThreadPoolNums=128

2.4.2、broker-b.properties

#所屬集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此處不同的配置文件填寫的不一樣brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分號分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數defaultTopicQueueNums=4#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉autoCreateTopicEnable=true#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉autoCreateSubscriptionGroup=true#Broker 對外服務的監聽端口listenPort=10911#刪除文件時間點,默認凌晨 4點deleteWhen=04#文件保留時間,默認 48 小時fileReservedTime=120#commitLog每個文件的大小默認1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每個文件默認存30W條,根據業務情況調整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#檢測物理文件磁盤空間diskMaxUsedSpaceRatio=88#存儲路徑storePathRootDir=/usr/local/rocketmq/store#commitLog 存儲路徑storePathCommitLog=/usr/local/rocketmq/store/commitlog#消費隊列存儲路徑存儲路徑storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存儲路徑storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存儲路徑storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存儲路徑abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 異步復制Master#- SYNC_MASTER 同步雙寫Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盤方式#- ASYNC_FLUSH 異步刷盤#- SYNC_FLUSH 同步刷盤flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#發消息線程池數量#sendMessageThreadPoolNums=128#拉消息線程池數量#pullMessageThreadPoolNums=128

兩個配置文件需修改處

brokerName=broker-a|broker-b      集群a服務器配置修改為   brokerName=broker-abrokerName=broker-a|broker-b      集群b服務器配置修改為   brokerName=broker-b

2.5、修改啟動參數 /rocketm/bin下 (jvm)

runbroker.sh 的JAVA_OPT runserver.sh 的JAVA_OPT

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"修改為
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

2.6、啟動 NameServer 安裝目錄 /usr/local/ /rocketmq/bin 目錄下

nohup sh mqnamesrv &

2.7、啟動 BrokerServer /rocketmq/bin 目錄下

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
netstat -ntlp

查看啟動狀態

jps

結果如下啟動成功 Linux下怎么部署分布式消息系統RocketMQ

3.修改linux 服務器host

本機ip,配置域名

192.168.177.128 rocketmq-nameserver1
192.168.177.128 rocketmq-master1
192.168.111.129 rocketmq-nameserver2
192.168.111.129 rocketmq-master2

圖片

  Linux下怎么部署分布式消息系統RocketMQ

4.安裝后臺管理平臺

解壓安裝 tomcat 7.0到 /usr/local/

tar -zxvf apache-tomcat-7.0.65.tar.gz -C /usr/local

rocketmq-web-console.war 復制到apache-tomcat-7.0.65 的webapps 目錄下 Linux下怎么部署分布式消息系統RocketMQ 啟動tomcat 自動解壓,然后修改config /rocketmq-web-console/WEB-INF/classes 的 config.properties 配置 修改ip Linux下怎么部署分布式消息系統RocketMQ

單服務器
rocketmq.namesrv.addr=192.168.177.128:9876

多服務器  
rocketmq.namesrv.addr=192.168.177.128:9876;192.168.177.129:9876

關閉tomcat / 重啟tomcat

關閉防火墻

systemctl disable firewalld   或  chkconfig iptables off

訪問 —-》 ip:8080/rocketmq-web-console 出現下方界面就ok了 Linux下怎么部署分布式消息系統RocketMQ

java 操作

1、生產者
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {

   public static void main(String[] args) throws MQClientException {
       DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
       producer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
       producer.setInstanceName("producer");
       producer.start();
       try {
           for (int i = 0; i "test-topic",
                       "TagA",
                       ("test-topic-"+i).getBytes()
               );
               SendResult sendResult = producer.send(msg);
               System.out.println(sendResult.toString());
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
       producer.shutdown();
   }

}
2、消費者
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
   public static void main(String[] args) throws MQClientException {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

       consumer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
       consumer.setInstanceName("consumer");
       consumer.subscribe("test-topic", "TagA");

       consumer.registerMessageListener(new MessageListenerConcurrently() {
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
               for (MessageExt msg : msgs) {
                   System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
               }
               //返回成功消費狀態
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
       consumer.start();
       System.out.println("Consumer Started.");
   }
}

會出現冪等問題,使用全局id,或者時間戳,業務的唯一id 進行判斷,使用redis等日志記錄判斷是否存在,存在表示已經成功消費

到此,相信大家對“Linux下怎么部署分布式消息系統RocketMQ”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女