這期內容當中小編將會給大家帶來有關REST微服務中怎么利用消息中間件實現分布式事務,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
我們還是使用之前的實例,一個訂票系統的購票邏輯:
這篇教程的源代碼可以從github上獲取。
使用消息中間件實現分布式事務,也就是使用事件驅動實現。在這種方式下,Order服務不會直接調用User服務,而是往MQ上發一個消息,說明有新訂單需要扣費;User服務會響應這個消息,并處理,處理完成后再發一個消息,說明有新訂單需要轉移票;然后就會有Ticket服務來處理。而每個服務都是在一個事務里面處理讀消息、處理業務、寫消息的事情。大致流程如下:
在這種方式下,訂單的處理是異步的,用戶發起一個訂單的時候,只是生成一個正在處理的訂單,然后通過消息中間件一步步的進行扣費、交票、完成訂單等邏輯。而每一個服務中相應的操作,基本都是:
從一個隊列中讀取消息
操作相應數據庫操作
往下一個隊列中發送消息
也就是說,需要在這個方法中需要操作數據庫和MQ兩個資源,這正好是上一篇文章中介紹Spring內部事務和外部事務時使用的實例中的場景。下面就是大致的代碼:
12345678 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);} |
它監聽MQ的”order:new”隊列,處理訂單,往”order:need_to_pay”發送一個消息。然后用戶服務就會接收這個消息,觸發扣費流程。
在這個地方,我們可以使用JTA事務,來使用兩階段提交來實現兩個資源的共同提交,但是這會影響系統的性能。而且,還需要使用的消息中間件實現了XA的規范,提供兩階段提交的功能。
這里也可以使用本地事務,這時,每個事物都會有一個JMS的Session,并使用事務。如此一來,就存在一個數據庫的事物和一個JMS的事務,兩個事務是相互獨立并依次提交的。這樣,就有可能在極少數情況下出錯,但是也能采取一些錯誤來盡量解決。我們對上面的事務處理展開(偽代碼,只是為了說明處理過程),來看看出錯的情況以及該如何處理:
1234567891011 | jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();} |
在上面的方法中,只要發生了錯誤,MQ消息的消費就算失敗,MQ的監聽器就會重新觸發一次這個方法。
這時,如果錯誤發生在:
數據提交時或之前。這時,整個數據庫的操作都會被重置(也可能就根本還沒更新),重試的時候不需要考慮重復提交的問題,因為之前的提交都已經被回滾。
數據庫提交成功,但是JMS提交失敗。這時就需要防止重復提交來避免數據庫的重復操作。
我們可以采用之前說過的token方式,在調用這個方法前,生成一個唯一的token。這里使用Java的UUID生成一個ID作為token。(如果這里的重復調用只是在這個服務內部重新觸發,就不需要考慮分布式系統的全局一致性ID的問題。這需要根據實際情況來判斷用什么樣的UUID生成方式)所以,Controller里面接受購票請求如下:
1234567 | @PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);} |
然后在Service里面監聽這個隊列,處理購票:
123456789101112131415 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());} |
簡單來說,解決辦法就是,如果是重復觸發的,就略過數據庫相關的處理,直接往MQ的目標隊列發送需要的數據。使得整個流程能夠往下走。
剛才說的是在一個服務內出錯的情況,還有一種錯誤情況是,訂單服務和用戶服務已經處理完訂單創建和扣費的操作,然后到了Ticket服務的時候,卻發現沒有票了。雖然我們可以通過合理的設計業務邏輯來避免這種問題,例如,在操作之前先檢查用戶余額,檢查并鎖票,然后進行操作數據的事情。但是,在有些情況下,很難通過業務流程的設計來完全避免這種問題。如果出現了這種的問題,我們也可以通過撤銷的流程來實現,業務流程如下:
在上面的解決方案中,使用JDK的UUID類生成一個ID,實際上這個ID只是在當前的JVM內,才能夠保證是唯一的。其次,在JMS的標準中,沒有規定一個消息的Listener在讀取一個消息失敗后,重新讀取的問題。在微服務環境中,如果一個應用部署了多個實例,那個這個消息有可能會被另一個實例讀到。所以在上面的方案中,使用JVM內的唯一ID放在消息的內容中,它有可能被任意一個實例處理,處理失敗后,又有可能被另一個實例處理。這就會出問題。所以我們需要一個分布式環境下的生成唯一ID的解決辦法。例如,先獲得JVM的唯一ID以后,再加上IP+端口等信息。而且,對已經處理過的ID的緩存,也需要存在分布式環境中。
所以,我們完全可以不使用兩階段提交,就實現微服務架構下的分布式事務。使用這種方式,它的優點是:
實現簡單。結合Spring的事務,幾乎不用寫額外的事務相關的代碼,就能夠實現。我們只需要更好的服務的拆分和設計業務流程。
系統吞吐量高。因為數據庫或MQ不會被長期的鎖住,可以并發的處理更多的事務。
容錯性好。各個服務之間通過MQ來觸發協調,即使在處理一個任務的時候有一個服務停了,消息還會一直保持,直到服務起來開始監聽,然后繼續觸發這個任務。
當然這種方式也有一些缺點,最大的問題就是異步處理的問題。用戶發出一個請求后,處理該業務的服務只是簡單處理,往MQ發送消息開始處理流程,然后就返回了。這時候這個任務還在處理。雖然有時候我們可以通過等的方式,等待最終處理完成的消息,然后在返回給用戶。但是這樣又得考慮響應時間、超時、各種錯誤等情況。
有些人會覺得這種方式使得開發和調試都變得復雜,在我看來,恰恰相反,這使得開發和調試都簡單了。首先,根據微服務架構的設計原則,就是每個服務只負責一個功能模塊;再者,根據面向對象的設計原則,一個方法只做一件事情。如果我們能夠合理的拆分服務,和每一步的處理方法,這正是一個好的設計。在維護的時候,每個方法、每個步驟做什么事情,都很清楚。
說到調試,我的原則是,你應該通過單元測試來發現和解決問題,而不是調試。以上面的購票流程為例,每一個服務,通過MQ觸發一個方法的時候,它的參數應該是什么、狀態是什么都應該是明確的,這個方法執行完成后,會產生什么新的數據,狀態會更新成什么,都應該是明確的。而這些都可以通過單元測試來很好的測試。如果你的復雜流程中的每一個都能通過單元測試進行完善的測試,那么這些方法串聯到一起,不但能夠很好的工作,也能應付各種異常的情況。
上述就是小編為大家分享的REST微服務中怎么利用消息中間件實現分布式事務了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。