溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ如何解決分布式事務

發布時間:2021-06-22 14:53:17 來源:億速云 閱讀:212 作者:chen 欄目:大數據

本篇內容主要講解“RocketMQ如何解決分布式事務”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RocketMQ如何解決分布式事務”吧!

一致性如何保證:

RocketMQ解決分布式事務(可靠消息最終一致性方案)

1、A系統發送一個prepared消息到MQ,如果這個prepared消息發送失敗那么就直接取消操作別執行了。

2、如果這個消息發送成功了、就接著執行本地事務(executeLocalTransaction),如果成功就告訴MQ發送確認消息,如果失敗,就告訴MQ發送回滾消息。

3、如果發送了確認消息、那么B系統會接收到確認消息,然后執行本地事務。

4、上面的第2步, 由于網絡原因發送確認or回滾消息失敗,但是broker有輪詢機制,根據唯一id查詢本地事務狀態,MQ會自動定時輪詢所有prepared消息回調你的接口(checkLocalTransaction),問你,這個消息是不是本地事務處理失敗了,所有沒有發送確認的消息,是繼續重試還是回滾?一版來說這里你就可以查下數據庫看之前本地事務是否執行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務執行成功了,而確認消息卻發送失敗了。

PS:此方案是不支持事務發起服務進行回滾的,但是大部分互聯網應用都不會要求事務發起方進行回滾,如果一定要事務發起方進行回滾應該采用2PC、3PC、TCC等強一致性方案來實現分布式事務,比如LCN。

訂單-庫存-分布式事務

這里通過一個實例來講一下RocketMQ實現分布式事務具體編碼。

場景: 下單場景,訂單服務生成訂單,當訂單支付成功之后,修改訂單狀態已支付,并且要通知庫存服務進行庫存的扣減。

數據庫設計:

CREATE TABLE `yzy_order` (
  `id` int(11) NOT NULL,
  `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '訂單id',
  `buy_num` int(11) DEFAULT NULL COMMENT '購買數量',
  `good_id` int(11) DEFAULT NULL COMMENT '商品ID',
  `user_id` int(11) DEFAULT NULL COMMENT '用戶ID',
  `pay_status` int(11) DEFAULT NULL COMMENT '支付狀態,0:沒有支付,1:已經支付',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci




CREATE TABLE `yzy_repo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名稱',
  `num` int(11) NOT NULL DEFAULT '0' COMMENT '庫存數量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='測試,庫存表表'


開始實戰

訂單服務service的主要方法

package com.transaction.order;

import com.alibaba.dubbo.config.annotation.Reference;
import com.transaction.repository.IRepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
public class OrderService {
    @Autowired
    OrderDao orderDao;

    public final int PAY_DONE = 1;

    /**
     *  檢查訂單是否存在并且狀態是支付完成
    **/
    public boolean checkOrderPaySuccess(String orderId){
        List<YzyOrder> allOrders = orderDao.findAll();
        return  allOrders.stream()
                .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE);
    }

 /**
     *  更新訂單是為支付完成
    **/
    public void updatePayStatusByOrderId(String orderId){
        orderDao.updatePayStatusByOrderId(orderId, PAY_DONE);
    }

 /**
     *  生成訂單,狀態默認是未支付
    **/

    public void save(String orderId, int num, int goodId,int userId) {

        YzyOrder yzyOrder = new YzyOrder();
        yzyOrder.setOrderId(orderId);
        yzyOrder.setBuyNum(num);
        yzyOrder.setGoodId(goodId);
        yzyOrder.setUserId(userId);

        orderDao.save(yzyOrder);
    }
}

業務流程

1.在訂單表創建一個狀態是未支付的訂單

 在終端或者瀏覽器 執行  curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001' 

 /**
     * 生成訂單接口
     * @param num
     * @param goodId
     * @param userId
     * @return
     */
    @GetMapping("save")
    public String makeOrder(
            @RequestParam("num") int num,
            @RequestParam("good_id") int goodId,
            @RequestParam("user_id") int userId) {

        orderService.save(UUID.randomUUID().toString(), num, goodId,userId);
        return "success";
    }

2.用戶支付完成,通過MQ通知庫存服務扣減庫存

OrderController:pay 發送訂單支付成功的MQ事務消息,這里注意體會,并不是直接調用OrderService::updatePayStatusByOrderId 然后發送普通的MQ消息。而是先發送事務消息到MQ,然后MQ回調訂單服務的TransactionListener::executeLocalTransaction,在這里完成訂單狀態的更新,保證發送事務消息和更新訂單狀態的一致性.

  @GetMapping("pay")
    public String pay(@RequestParam("order_id") String orderId)
            throws UnsupportedEncodingException, MQClientException, JsonProcessingException {
        transactionProducer.sendOrderPaySucessEvent(orderId);
        return "success";
    }

3.訂單服務端的事務消息監聽器

@Component
public class TransactionProducer implements InitializingBean {

    private TransactionMQProducer producer;

    @Autowired
    private OrderService orderService;

    @Autowired
    private OrderDao orderDao;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("order-pay-group");
        producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);
        //設置發送消息的回調
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 根據消息發送的結果 判斷是否執行本地事務
             *
             * 回調該方法的時候說明 消息已經成功發送到了MQ,可以把訂單狀態更新為 "支付成功"
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 根據本地事務執行成與否判斷 事務消息是否需要commit與 rollback
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                try {
                    OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class);

                    //MQ已經收到了TransactionProducer send方法發送的事務消息,下面執行本地的事務
                    //本地記錄訂單信息
                    orderService.updatePayStatusByOrderId(record.getOrderId());

                    state = LocalTransactionState.COMMIT_MESSAGE;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                } catch (IOException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return state;
            }
            /**
             * RocketMQ 回調 根據本地事務是否執行成功 告訴broker 此消息是否投遞成功
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                OrderRecord record = null;
                try {
                    record = objectMapper.readValue(msg.getBody(), OrderRecord.class);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    //根據是否有transaction_id對應轉賬記錄 來判斷事務是否執行成功
                    boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());

                    if (isLocalSuccess) {
                        state = LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        state = LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return state;
            }
        });
        producer.start();
    }

    public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {
        ObjectMapper objectMapper = new ObjectMapper();
        YzyOrder order = orderDao.findAll().stream()
                .filter(item->item.getOrderId().equals(orderId))
                .collect(Collectors.toList()).get(0);
        if(order == null){
            System.out.println("not found order " + orderId);
        }
        // 構造發送的事務 消息
        OrderRecord record = new OrderRecord();
        record.setUserId(order.getUserId());
        record.setOrderId(orderId);
        record.setBuyNum(order.getBuyNum());
        record.setPayStatus(order.getPayStatus());
        record.setGoodId(order.getGoodId());

        Message message = new Message("Order-Success", "", record.getOrderId(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println("發送事務消息 ,orderId = " + record.getOrderId() + " " + result.toString());
    }
}

4.庫存服務扣減庫存

需要注意的問題:

1.  扣減庫存要防止在并發的情況下被扣成負數

2. 先select后update的方式更新庫存要加分布式鎖或者數據庫樂觀鎖,update語句需要是冪等的

   UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;

3. 注意通過msgId或者orderId來進行消費冪等處理

 @Override
    public int reduce(Integer buyNum, Integer goodId) {

        //并發的情況下,為了防止庫存被扣成負數,有三種解決方案
        //1. select for update (必須放到事務中)
        //2. 這段邏輯加上分布式鎖
        //3. 數據庫加上一個version字段,樂觀鎖

        while (true){
            Optional<YzyRepo> repoOption = repositoryDao.findById(goodId);
            if (!repoOption.isPresent()) {
                return 0;
            }

            YzyRepo repo = repoOption.get();

            //避免數據庫庫存扣減小于零
            if (repo.getNum() - buyNum < 0) {
                return -1;
            }
            repo.setNum(repo.getNum() - buyNum);

            int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId);
            if(affect > 0){
                return affect;
            }
        }
    }

到此,相信大家對“RocketMQ如何解決分布式事務”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女