溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么用SpringBoot+RabbitMQ實現消息可靠傳輸

發布時間:2022-05-25 10:08:31 來源:億速云 閱讀:391 作者:iii 欄目:開發技術

這篇文章主要介紹了怎么用SpringBoot+RabbitMQ實現消息可靠傳輸的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么用SpringBoot+RabbitMQ實現消息可靠傳輸文章都會有所收獲,下面我們一起來看看吧。

    環境配置

    SpringBoot 整合 RabbitMQ 實現消息的發送。

    1.添加 maven 依賴

           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    2.添加 application.yml 配置文件

    spring:
      rabbitmq:
        host: 192.168.3.19
        port: 5672
        username: admin
        password: xxxx

    3.配置交換機、隊列以及綁定

        @Bean
        public DirectExchange myExchange() {
            DirectExchange directExchange = new DirectExchange("myExchange");
            return directExchange;
        }
    
        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue");
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
        }

    4.生產發送消息

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
            System.out.println("【發送消息】" + message)
            return "【send message】" + message;
        }

    5.消費者接收消息

        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 當前時間" + time);

    6.調用生產端發送消息 hello,控制臺輸出:

    【發送消息】hello
    【接收信息】hello 當前時間2022-05-12 10:21:14

    說明消息已經被成功接收。

    消息丟失分析

    怎么用SpringBoot+RabbitMQ實現消息可靠傳輸

    一條消息的從生產到消費,消息丟失可能發生在以下幾個階段:

    • 生產端丟失: 生產者無法傳輸到 RabbitMQ

    • 存儲端丟失: RabbitMQ 存儲自身掛了

    • 消費端丟失:存儲由于網絡問題,無法發送到消費端,或者消費掛了,無法發送正常消費

    RabbitMQ 從生產端、儲存端、消費端都對可靠性傳輸做很好的支持。

    生產階段

    生產階段通過請求確認機制,來確保消息的可靠傳輸。當發送消息到 RabbitMQ 服務器 之后,RabbitMQ 收到消息之后,給發送返回一個請求確認,表示RabbitMQ 服務器已成功的接收到了消息。

    配置application.yml

    spring:
      rabbitmq:
        # 消息確認機制 生產者 -> 交換機
        publisher-confirms: true
        # 消息返回機制  交換機 -> 隊列
        publisher-returns: true

    配置

    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("【correlationData】:" + correlationData);
                    log.info("【ack】" + ack);
                    log.info("【cause】" + cause);
                    if (ack) {
                        log.info("【發送成功】");
                    } else {
                        log.info("【發送失敗】correlationData:" + correlationData + " cause:" + cause);
                    }
                }
            });
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.warn("【消息發送失敗】");
                    log.info("【message】" + message);
                    log.info("【replyCode】" + replyCode);
                }
            });
    
            return rabbitTemplate;
        }
    }

    消息從 生產者 到 交換機, 有confirmCallback 確認模式。發送消息成功后消息會調用方法confirm(CorrelationData correlationData, boolean ack, String cause),根據 ack 判斷消息是否發送成功。

    消息從 交換機 到 隊列,有returnCallback 退回模式。

    發送消息 product message 控制臺輸出如下:

    【發送消息】product message
    【接收信息】product message 當前時間2022-05-12 11:27:56
    【correlationData】:null
    【ack】true
    【cause】null
    【發送成功】

    生產端模擬消息丟失

    這里有兩個方案:

    • 發送消息后立馬關閉 broke,后者把網絡關閉,但是broker關閉之后控制臺一直就會報錯,發送消息也報500錯誤。

    • 發送不存在的交換機:

    // myExchange 修改成 myExchangexxxxx
    rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

    結果:

    【correlationData】:null
    【ack】false
    【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
    【發送失敗】

    當發送失敗可以對消息進行重試

    交換機正確,發送不存在的隊列:

    交換機接收到消息,返回成功通知,控制臺輸出:

    【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
    【ack】true
    【cause】null
    【發送成功】

    交換機沒有找到隊列,返回失敗信息:

    【消息發送失敗】
    【message】product message
    【replyCode】312

    RabbitMQ

    開啟隊列持久化,創建的隊列和交換機默認配置是持久化的。首先把隊列和交換機設置正確,修改消費監聽的隊列,使得消息存放在隊列里。

    修改隊列的持久化,修改成非持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",false);
            return queue;
        }

    發送消息之后,消息存放在隊列中,然后重啟 RabbitMQ,消息不存在了。
    設置隊列持久化:

        @Bean
        public Queue myQueue() {
            Queue queue = new Queue("myQueue",true);
            return queue;
        }

    重啟之后,隊列的消息還存在。

    消費端

    消費端默認開始 ack 自動確認模式,當隊列消息被消費者接收,不管有沒有被消費端消息,都自動刪除隊列中的消息。所以為了確保消費端能成功消費消息,將自動模式改成手動確認模式:

    修改application.yml 文件

    spring:
      rabbitmq:
        # 手動消息確認
        listener:
          simple:
            acknowledge-mode: manual

    消費接收消息之后需要手動確認:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        public void process(String msg, Channel channel, Message message) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date();
            String time = sdf.format(date);
            System.out.println("【接收信息】" + msg + " 當前時間" + time);
            System.out.println(message.getMessageProperties().getDeliveryTag());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    如果不添加:

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    發送兩條消息

    消息被接收后,沒有確認,重新放到隊列中:

    怎么用SpringBoot+RabbitMQ實現消息可靠傳輸

    重啟項目,之后,隊列的消息會發送到消費者,但是沒有 ack 確認,還是繼續會放回隊列中。

    加上 channel.basicAck 之后,再重啟項目

    怎么用SpringBoot+RabbitMQ實現消息可靠傳輸

    隊列消息就被刪除了

    basicAck 方法最后一個參數 multiple 表示是刪除之前的隊列。

    multiple 設置成 true,把后面的隊列都清理掉了

    怎么用SpringBoot+RabbitMQ實現消息可靠傳輸

    關于“怎么用SpringBoot+RabbitMQ實現消息可靠傳輸”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么用SpringBoot+RabbitMQ實現消息可靠傳輸”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女