這篇文章主要介紹了怎么保證RabbitMQ全鏈路數據100%不丟失的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么保證RabbitMQ全鏈路數據100%不丟失文章都會有所收獲,下面我們一起來看看吧。
正在學RabbitMQ,特此記錄一下,這里就不講RabbitMQ基礎了,直接進入主題。
我們都知道,消息從生產端到消費端消費要經過3個步驟:
生產端發送消息到RabbitMQ;
RabbitMQ發送消息到消費端;
消費端消費這條消息;
這3個步驟中的每一步都有可能導致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統的可靠性。
這里的可靠并不是一定就100%不丟失了,磁盤損壞,機房爆炸等等都能導致數據丟失,當然這種都是極小概率發生,能做到99.999999%消息不丟失,就是可靠的了。
下面來具體分析一下問題以及解決方案。
生產端可靠性投遞,即生產端要確保將消息正確投遞到RabbitMQ中。生產端投遞的消息丟失的原因有很多,比如消息在網絡傳輸的過程中發生網絡故障消息丟失,或者消息投遞到RabbitMQ時RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發生了什么。針對以上情況,RabbitMQ本身提供了一些機制。
事務消息機制由于會嚴重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級的解決方案——confirm消息確認機制。
什么是confirm消息確認機制?顧名思義,就是生產端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會發送一個確認消息給生產端,讓生產端知道我已經收到消息了,否則這條消息就可能已經丟失了,需要生產端重新發送消息了。
通過下面這句代碼來開啟確認模式:
channel.confirmSelect();// 開啟發送方確認模式
然后異步監聽確認和未確認的消息:
channel.addConfirmListener(new ConfirmListener() { //消息正確到達broker @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已收到消息"); //做一些其他處理 } //RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); //做一些其他處理,比如消息重發等 } });
這樣就可以讓生產端感知到消息是否投遞到RabbitMQ中了,當然這樣還不夠,稍后我會說一下極端情況。
那消息持久化呢?我們知道,RabbitMQ收到消息后將這個消息暫時存在了內存中,那這就會有個問題,如果RabbitMQ掛了,那重啟后數據就丟失了,所以相關的數據應該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數據恢復。那如何持久化呢?
message消息到達RabbitMQ后先是到exchange交換機中,然后路由給queue隊列,最后發送給消費端。
所有需要給exchange、queue和message都進行持久化:
exchange持久化:
//第三個參數true表示這個exchange持久化 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
queue持久化:
//第二個參數true表示這個queue持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
message持久化:
//第三個參數MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
這樣,如果RabbitMQ收到消息后掛了,重啟后會自行恢復消息。
到此,RabbitMQ提供的幾種機制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會有極端情況,比如RabbitMQ收到消息還沒來得及將消息持久化到硬盤時,RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發送確認消息給生產端的過程中,由于網絡故障而導致生產端沒有收到確認消息,這樣生產端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來的處理。
所以除了RabbitMQ提供的一些機制外,我們自己也要做一些消息補償機制,以應對一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。
消息入庫,顧名思義就是將要發送的消息保存到數據庫中。
首先發送消息前先將消息保存到數據庫中,有一個狀態字段status=0,表示生產端將消息發送給了RabbitMQ但還沒收到確認;在生產端收到確認后將status設為1,表示RabbitMQ已收到消息。
這里有可能會出現上面說的兩種情況,所以生產端這邊開一個定時器,定時檢索消息表,將status=0并且超過固定時間后(可能消息剛發出去還沒來得及確認這邊定時器剛好檢索到這條status=0的消息,所以給個時間)還沒收到確認的消息取出重發(第二種情況下這里會造成消息重復,消費者端要做冪等性),可能重發還會失敗,所以可以做一個最大重發次數,超過就做另外的處理。
這樣消息就可以可靠性投遞到RabbitMQ中了,而生產端也可以感知到了。
既然已經可以讓生產端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費端的了,如何讓消費端不丟失消息。
默認情況下,以下3種情況會導致消息丟失:
在RabbitMQ將消息發出后,消費端還沒接收到消息之前,發生網絡故障,消費端與RabbitMQ斷開連接,此時消息會丟失;
在RabbitMQ將消息發出后,消費端還沒接收到消息之前,消費端掛了,此時消息會丟失;
消費端正確接收到消息,但在處理消息的過程中發生異?;蝈礄C了,消息也會丟失。
其實,上述3中情況導致消息丟失歸根結底是因為RabbitMQ的自動ack機制,即默認RabbitMQ在消息發出后就立即將這條消息刪除,而不管消費端是否接收到,是否處理完,導致消費端消息丟失時RabbitMQ自己又沒有這條消息了。
所以就需要將自動ack機制改為手動ack機制。
消費端手動確認消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { //接收到消息,做處理 //手動確認 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { //出錯處理,這里可以讓消息重回隊列重新發送或直接丟棄消息 } }; //第二個參數autoAck設為false表示關閉自動確認機制,需手動確認 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
這樣,當autoAck參數置為false,對于RabbitMQ服務端而言,隊列中的消息分成了兩個部分:
一部分是等待投遞給消費端的消息;
一部分是已經投遞給消費端,但是還沒有收到消費端確認信號的消息。
如果RabbitMQ一直沒有收到消費端的確認信號,并且消費此消息的消費端已經斷開連接或宕機(RabbitMQ會自己感知到),則RabbitMQ會安排該消息重新進入隊列(放在隊列頭部),等待投遞給下一個消費者,當然也有能還是原來的那個消費端,當然消費端也需要確保冪等性。
關于“怎么保證RabbitMQ全鏈路數據100%不丟失”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么保證RabbitMQ全鏈路數據100%不丟失”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。