本篇內容主要講解“RocketMQ如何解決分布式事務”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RocketMQ如何解決分布式事務”吧!
RocketMQ解決分布式事務(可靠消息最終一致性方案)
1、A系統發送一個prepared消息到MQ,如果這個prepared消息發送失敗那么就直接取消操作別執行了。
2、如果這個消息發送成功了、就接著執行本地事務(executeLocalTransaction),如果成功就告訴MQ發送確認消息,如果失敗,就告訴MQ發送回滾消息。
3、如果發送了確認消息、那么B系統會接收到確認消息,然后執行本地事務。
4、上面的第2步, 由于網絡原因發送確認or回滾消息失敗,但是broker有輪詢機制,根據唯一id查詢本地事務狀態,MQ會自動定時輪詢所有prepared消息回調你的接口(checkLocalTransaction),問你,這個消息是不是本地事務處理失敗了,所有沒有發送確認的消息,是繼續重試還是回滾?一版來說這里你就可以查下數據庫看之前本地事務是否執行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務執行成功了,而確認消息卻發送失敗了。
PS:此方案是不支持事務發起服務進行回滾的,但是大部分互聯網應用都不會要求事務發起方進行回滾,如果一定要事務發起方進行回滾應該采用2PC、3PC、TCC等強一致性方案來實現分布式事務,比如LCN。
這里通過一個實例來講一下RocketMQ實現分布式事務具體編碼。
場景: 下單場景,訂單服務生成訂單,當該訂單支付成功之后,修改訂單狀態已支付,并且要通知庫存服務進行庫存的扣減。
CREATE TABLE `yzy_order` ( `id` int(11) NOT NULL, `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '訂單id', `buy_num` int(11) DEFAULT NULL COMMENT '購買數量', `good_id` int(11) DEFAULT NULL COMMENT '商品ID', `user_id` int(11) DEFAULT NULL COMMENT '用戶ID', `pay_status` int(11) DEFAULT NULL COMMENT '支付狀態,0:沒有支付,1:已經支付', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `yzy_repo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名稱', `num` int(11) NOT NULL DEFAULT '0' COMMENT '庫存數量', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='測試,庫存表表'
package com.transaction.order; import com.alibaba.dubbo.config.annotation.Reference; import com.transaction.repository.IRepositoryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.List; @Service public class OrderService { @Autowired OrderDao orderDao; public final int PAY_DONE = 1; /** * 檢查訂單是否存在并且狀態是支付完成 **/ public boolean checkOrderPaySuccess(String orderId){ List<YzyOrder> allOrders = orderDao.findAll(); return allOrders.stream() .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE); } /** * 更新訂單是為支付完成 **/ public void updatePayStatusByOrderId(String orderId){ orderDao.updatePayStatusByOrderId(orderId, PAY_DONE); } /** * 生成訂單,狀態默認是未支付 **/ public void save(String orderId, int num, int goodId,int userId) { YzyOrder yzyOrder = new YzyOrder(); yzyOrder.setOrderId(orderId); yzyOrder.setBuyNum(num); yzyOrder.setGoodId(goodId); yzyOrder.setUserId(userId); orderDao.save(yzyOrder); } }
在終端或者瀏覽器 執行 curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001'
/** * 生成訂單接口 * @param num * @param goodId * @param userId * @return */ @GetMapping("save") public String makeOrder( @RequestParam("num") int num, @RequestParam("good_id") int goodId, @RequestParam("user_id") int userId) { orderService.save(UUID.randomUUID().toString(), num, goodId,userId); return "success"; }
OrderController:pay 發送訂單支付成功的MQ事務消息,這里注意體會,并不是直接調用OrderService::updatePayStatusByOrderId 然后發送普通的MQ消息。而是先發送事務消息到MQ,然后MQ回調訂單服務的TransactionListener::executeLocalTransaction,在這里完成訂單狀態的更新,保證發送事務消息和更新訂單狀態的一致性.
@GetMapping("pay") public String pay(@RequestParam("order_id") String orderId) throws UnsupportedEncodingException, MQClientException, JsonProcessingException { transactionProducer.sendOrderPaySucessEvent(orderId); return "success"; }
@Component public class TransactionProducer implements InitializingBean { private TransactionMQProducer producer; @Autowired private OrderService orderService; @Autowired private OrderDao orderDao; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("order-pay-group"); producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876"); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory); producer.setExecutorService(executor); //設置發送消息的回調 producer.setTransactionListener(new TransactionListener() { /** * 根據消息發送的結果 判斷是否執行本地事務 * * 回調該方法的時候說明 消息已經成功發送到了MQ,可以把訂單狀態更新為 "支付成功" */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 根據本地事務執行成與否判斷 事務消息是否需要commit與 rollback ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; try { OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class); //MQ已經收到了TransactionProducer send方法發送的事務消息,下面執行本地的事務 //本地記錄訂單信息 orderService.updatePayStatusByOrderId(record.getOrderId()); state = LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } catch (IOException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } /** * RocketMQ 回調 根據本地事務是否執行成功 告訴broker 此消息是否投遞成功 * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; OrderRecord record = null; try { record = objectMapper.readValue(msg.getBody(), OrderRecord.class); } catch (IOException e) { e.printStackTrace(); } try { //根據是否有transaction_id對應轉賬記錄 來判斷事務是否執行成功 boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId()); if (isLocalSuccess) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); } return state; } }); producer.start(); } public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException { ObjectMapper objectMapper = new ObjectMapper(); YzyOrder order = orderDao.findAll().stream() .filter(item->item.getOrderId().equals(orderId)) .collect(Collectors.toList()).get(0); if(order == null){ System.out.println("not found order " + orderId); } // 構造發送的事務 消息 OrderRecord record = new OrderRecord(); record.setUserId(order.getUserId()); record.setOrderId(orderId); record.setBuyNum(order.getBuyNum()); record.setPayStatus(order.getPayStatus()); record.setGoodId(order.getGoodId()); Message message = new Message("Order-Success", "", record.getOrderId(), objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction(message, null); System.out.println("發送事務消息 ,orderId = " + record.getOrderId() + " " + result.toString()); } }
需要注意的問題:
1. 扣減庫存要防止在并發的情況下被扣成負數
2. 先select后update的方式更新庫存要加分布式鎖或者數據庫樂觀鎖,update語句需要是冪等的
UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;
3. 注意通過msgId或者orderId來進行消費冪等處理
@Override public int reduce(Integer buyNum, Integer goodId) { //并發的情況下,為了防止庫存被扣成負數,有三種解決方案 //1. select for update (必須放到事務中) //2. 這段邏輯加上分布式鎖 //3. 數據庫加上一個version字段,樂觀鎖 while (true){ Optional<YzyRepo> repoOption = repositoryDao.findById(goodId); if (!repoOption.isPresent()) { return 0; } YzyRepo repo = repoOption.get(); //避免數據庫庫存扣減小于零 if (repo.getNum() - buyNum < 0) { return -1; } repo.setNum(repo.getNum() - buyNum); int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId); if(affect > 0){ return affect; } } }
到此,相信大家對“RocketMQ如何解決分布式事務”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。