本篇文章為大家展示了怎么使用RocketMQ事務消息解決分布式事務,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
rocketmq模塊
rocketmq-broker:接受生產者發來的消息并存儲(通過調用rocketmq-store),消費者從這里取得消息。
rocketmq-client:提供發送、接受消息的客戶端API。
rocketmq-namesrv:NameServer,類似于Zookeeper,這里保存著消息的TopicName,隊列等運行時的元信息。(有點NameNode的味道)
rocketmq-common:通用的一些類,方法,數據結構等
rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定義二進制協議
rocketmq-store:消息、索引存儲等
rocketmq-filtersrv:消息過濾器Server,需要注意的是,要實現這種過濾,需要上傳代碼到MQ!【一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復雜的過濾需求,可以考慮filtersrv組件】
rocketmq-tools:命令行工具
分布式消息隊列RocketMQ—事務消息—解決分布式事務
說到分布式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分布處于2個不同的DB,或者說2個不同的子系統里面,A要扣錢,B要加錢,如何保證原子性?
一般的思路都是通過消息中間件來實現“最終一致性”:A系統扣錢,然后發條消息給中間件,B系統接收此消息,進行加錢。
但這里面有個問題:A是先update DB,后發送消息呢? 還是先發送消息,后update DB?
假設先update DB成功,發送消息網絡失敗,重發又失敗,怎么辦?
假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎么辦?
所以,這里下個結論: 只要發送消息和update DB這2個操作不是原子的,無論誰先誰后,都是有問題的。
那這個問題怎么解決呢?
有人可能想到了,我可以把“發送消息”這個網絡調用和update DB放在同1個事務里面,如果發送消息失敗,update DB自動回滾。這樣不就保證2個操作的原子性了嗎?
這個方案看似正確,其實是錯誤的,原因有2:
(1)網絡的2將軍問題:發送消息失敗,發送方并不知道是消息中間件真的沒有收到消息呢?還是消息已經收到了,只是返回response的時候失敗了?
如果是已經收到消息了,而發送端認為沒有收到,執行update db的回滾操作。則會導致A賬號的錢沒有扣,B賬號的錢卻加了。
(2)把網絡調用放在DB事務里面,可能會因為網絡的延時,導致DB長事務。嚴重的,會block整個DB。這個風險很大。
基于以上分析,我們知道,這個方案其實是錯誤的!
假設消息中間件沒有提供“事務消息”功能,比如你用的是Kafka。那如何解決這個問題呢?
解決方案如下:
(1)Producer端準備1張消息表,把update DB和insert message這2個操作,放在一個DB事務里面。
(2)準備一個后臺程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。允許消息重復,但消息不會丟,順序也不會打亂。
(3)Consumer端準備一個判重表。處理過的消息,記在判重表里面。實現業務的冪等。但這里又涉及一個原子性問題:如果保證消息消費 + insert message到判重表這2個操作的原子性?
消費成功,但insert判重表失敗,怎么辦?關于這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過討論。
通過上面3步,我們基本就解決了這里update db和發送網絡消息這2個操作的原子性問題。
但這個方案的一個缺點就是:需要設計DB消息表,同時還需要一個后臺任務,不斷掃描本地消息。導致消息的處理和業務邏輯耦合額外增加業務方的負擔。
為了能解決該問題,同時又不和業務耦合,RocketMQ提出了“事務消息”的概念。
具體來說,就是把消息的發送分成了2個階段:Prepare階段和確認階段。
具體來說,上面的2個步驟,被分解成3個步驟:
(1) 發送Prepared消息
(2) update DB
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。
可能有人會問了,前2步執行成功了,最后1步失敗了怎么辦?這里就涉及到了RocketMQ的關鍵點:RocketMQ會定期(默認是1分鐘)掃描所有的Prepared消息,詢問發送方,到底是要確認這條消息發出去?還是取消此條消息?
具體代碼實現如下:
也就是定義了一個checkListener,RocketMQ會回調此Listener,從而實現上面所說的方案。
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();
然后執行本地事務,具體代碼如下
public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.如果消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }
上面所說的消息中間件上注冊的listener,超時以后會回調producer的接口以確定事務執行情況
總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把“掃描消息表”這個事情,不讓業務方做,而是消息中間件幫著做了。
至于消息表,其實還是沒有省掉。因為消息中間件要詢問發送方,事物是否執行成功,還是需要一個“變相的本地消息表”,記錄事物執行狀態。
可能有人又要說了,無論方案1,還是方案2,發送端把消息成功放入了隊列,但消費端消費失敗怎么辦?
消費失敗了,重試,還一直失敗怎么辦?是不是要自動回滾整個流程?
答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是非常巨大的,不但實現復雜,還會引入新的問題。比如自動回滾失敗,又怎么處理?
對應這種極低概率的case,采取人工處理,會比實現一個高復雜的自動化回滾系統,更加可靠,也更加簡單。
上述內容就是怎么使用RocketMQ事務消息解決分布式事務,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。