這篇文章主要講解了“RabbitMQ的高級特性是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“RabbitMQ的高級特性是什么”吧!
方式1:消息落庫,對消息狀態進行打標 1)發送方將業務數據進行入庫到BIZ_DB,將消息入庫到MSG_DB,status置為0,只要有一步操作失敗,就不進行下一步操作,如果該步驟失敗, 進行快速失敗的機制 2)發送方給MQ發送消息 3)MQ接收到消息后給發送方確認 4)發送方監聽到該確認消息后將消息status置為1,這是正常成功的邏輯 5)發送方有一個分布式的定時任務,掃描表中超時的status為0的消息,超時時間比如說可以設置為2min 6)對于status為0的消息,進行重新發送 7)設置最大重試次數,如Retry_count > 3時將該消息status置為2 注意: 1)業務數據和消息可以落到一個庫里.對于落到兩個庫里,對于小規模的設計應用,可以開啟事務,保證數據源都是一致的.對于大規模、高并 發情況下,沒聽說有大的互聯網公司有開啟事務,采用補償機制. 2)不好的地方:步驟1需要對數據庫持久化2次 3)分布式定時任務,保證同一時間點只有一個任務去抓取DB 第一個可靠性投遞,在高并發場景下是否適合?第一種方式需要入庫兩次,高并發情況下對數據庫壓力大.
方式2:消息的延遲投遞,做二次確認,回調檢查(接收海量的數據) 方案2目的是為了減少數據庫持久化操作次數,如訂單場景. 關鍵點不是100%成功,關鍵點是性能,扛得住高并發。不能100%成功, 人工或定時任務補償,對于核心鏈路,減少數據庫操作,UpStream上游服務生產端,DownStream下游服務消費端,Callback回調服務 1)業務數據落庫,僅做一次入庫,一次性生成兩條消息,然后發送第一條MQ消息到消息中間件. 注意:一定是業務數據落庫以后,再發消息.互聯網大廠不加事務,事務嚴重影響性能 2)第二條MQ消息延遲一段時間再發,延遲投遞,如2min之后再發 3)消費者進行監聽并消費消息 4)消費完成之后消費端在發送確認消息到MQ,該確認消息是消費端自己生成的 5)Callback服務監聽該確認消息,Callback知道下游成功處理了,Callback對該消息進行入庫 6)Callback監聽延遲投遞的消息,檢查MSG DB數據庫,發現該消息已經成功處理 7)如果之前出現異常,MSG DB中沒有該消息,Callback會RPC調用UpStream重新發送MQ消息 注意: 1)一定是業務數據落庫以后,再發送消息 2)不加事務,事務會造成嚴重的性能瓶頸 3)Callback只是一個補償服務,它不是核心鏈路上的
冪等性:一個冪等操作的特點是其任意執行多次執行所產生的影響均與一次執行的影響相同 我們可以借鑒數據庫的樂觀鎖機制,如執行一個更新庫存的SQL語句: 使用版本號來控制,比如elaticsearch update t_reps set count = count - 1, version = version + 1 where version = 1
消費端的冪等性保障 在海量訂單產生的業務高峰期,如何避免消息的重復消費問題?消費端實現冪等性,就意味著,我們的消息永遠不會消費多次, 即使我們收到了多條一樣的消息 業界主流的冪等性操作: 1)唯一ID + 指紋碼機制,利用數據庫主鍵去重 唯一ID + 指紋碼機制,指紋碼可以是根據業務生成的字段,如時間戳或者銀行返回的字段等,利用數據庫主鍵去重 select count(1) from t_order where id = 唯一ID + 指紋碼,如果為1說明之前已經消費過了 好處:實現簡單 壞處:高并發下有數據庫寫入的性能瓶頸 解決方案:跟進ID進行分庫分表進行算法路由 2)利用Redis的原子特性實現 使用Redis進行冪等,需要考慮的問題,使用redis的setnx命令 1、我們是否需要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩存如何做到一致性? 2、如果不進行落庫,那么都存儲到緩存中,如何設置定時同步的策略?
理解Confirm消息確認機制: 消息的確認,是指生產者投遞消息之后,如果Broker收到消息,則會給我們生產者一個應答. 生產者接收應答,用來確定這條消息是否正常 的發送到Broker,這種方式也是消息的可靠投遞的核心保障! 如何實現Confirm確認消息: 1) 在channel上開啟確認模式,channel.confirmSelect() 2) 在channel上添加監聽,addConfirmListener,監聽成功或失敗的返回結果,根據具體的結果對消息進行重新發送、或記錄日志等后續處理.
Return Listener用于處理一些不可路由的消息. 我們的消息生產者,通過指定一個Exchange和Routing Key,把消息送達到某一個隊列中去,然后我們的消費者監聽隊列,進行消費 處理操作! 但是在某些情況下,如果我們在發送消息的時候,當前的Exchange不存在或者指定的路由key路由不到,這個時候我們需要監聽這種 不可達的消息,就要使用Return Listener 在基礎API中有一個屬性Mandatory,如果為true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為false,那么 broker端自動刪除該消息.
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
什么是消費端限流? 假設RabbitMQ服務器有上萬條未處理的消息,巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據! RabbitMQ消費端有兩種消費消息后簽收模式:自動簽收、手動簽收(推薦) RabbitMQ提供了一種QoS服務質量保證功能,即在非自動確認消息的前提下(設置非自動簽收),如果一定數目的消息(通過基于consume 或者channel設置Qos的值)未被確認前,不進行消費新的消息. 消費端限流實現該方法channel.basicQos(0, 1, false); void BasicQos(uint prefetchSize,ushort prefetchCount,bool global) prefetchSize:設置監聽消息的大小,一般設置為0,不限制 prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送過多于N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到 有消息ackglobal:true\false,是否將上面設置應用于channel,簡單來說,就是上面限制是channel級別還是consumer級別
//1 限流方式第一件事就是autoAck設置為false channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel)); public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //核心代碼,ack,false不支持批量簽收 channel.basicAck(envelope.getDeliveryTag(), false); } }
消費端的手工ACK和NACK 消費端重回隊列是為了對沒有處理成功的消息,把消息重新投遞給Broker!一般我們在實際應用中,都會關閉重回隊列,也就是設置 為false.
TTL是Time To Live的縮寫,也就是生存時間 RabbitMQ支持消息的過期時間,在消息發送的時候可以指定 RabbitMQ支持隊列的過期時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那么消息就自動的清除
私信隊列:DLX,Dead Letter Exchange 利用DLX,當消息在一個隊列中變成死信(dead message)之后,它能被重新publish到另外一個Exchange,這個Exchange就是死信隊列. 消息變成死信的幾種情況 1) 消息被拒絕 2) 消息TTL過期 3) 隊列達到最大長度 DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性. 當這個隊列 中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列,可以監聽這個隊列中消息做相應的 處理.
感謝各位的閱讀,以上就是“RabbitMQ的高級特性是什么”的內容了,經過本文的學習后,相信大家對RabbitMQ的高級特性是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。