溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中順序消息、重復消息、事務消息和消息存儲的示例分析

發布時間:2021-12-17 16:34:08 來源:億速云 閱讀:217 作者:小新 欄目:云計算
# RocketMQ中順序消息、重復消息、事務消息和消息存儲的示例分析

## 一、順序消息的實現與示例

### 1.1 順序消息的核心機制
RocketMQ通過**隊列分區順序性**保證消息順序:
- 同一業務ID(如訂單ID)的消息會被分配到同一個MessageQueue
- 消費者通過MessageListenerOrderly接口順序消費

```java
// 生產者示例:指定業務Key保證同訂單消息進入同一隊列
Message msg = new Message("OrderTopic", "Order_12345", "訂單創建".getBytes());
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 根據訂單ID選擇隊列
        int index = Math.abs(arg.toString().hashCode()) % mqs.size();
        return mqs.get(index);
    }
}, "Order_12345");

// 消費者示例
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 保證順序處理
        processOrderMessages(msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

1.2 典型應用場景

  • 訂單狀態變更流程(創建→支付→發貨)
  • 數據庫binlog同步
  • 實時計算中的狀態機處理

二、重復消息的產生與解決方案

2.1 重復消息的三大誘因

  1. 生產者重試:網絡超時導致重復發送
  2. Broker主從切換:未收到ACK的重復投遞
  3. 消費者重啟:offset未及時提交

2.2 冪等性處理方案

// 使用Redis實現冪等校驗
public boolean checkMessageIdempotent(String msgId) {
    String key = "msg:" + msgId;
    // SETNX原子性操作
    return redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
}

// 業務層去重表設計
CREATE TABLE message_dedup (
    biz_id VARCHAR(64) PRIMARY KEY,
    msg_id VARCHAR(64),
    status TINYINT,
    create_time DATETIME
);

三、事務消息的完整流程

3.1 兩階段提交機制

@startuml
participant Producer
participant Broker
participant LocalDB

Producer -> Broker: 發送半消息(HALF_MESSAGE)
Broker --> Producer: 返回寫入成功
Producer -> LocalDB: 執行本地事務
alt 事務成功
    Producer -> Broker: 提交事務(COMMIT_MESSAGE)
else 事務失敗
    Producer -> Broker: 回滾事務(ROLLBACK_MESSAGE)
end
Broker -> Consumer: 投遞可消費消息
@enduml

3.2 代碼實現示例

TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 執行本地數據庫事務
            boolean success = orderService.createOrder((OrderDTO)arg);
            return success ? LocalTransactionState.COMMIT_MESSAGE : 
                           LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 補償檢查邏輯
        return orderService.checkOrderStatus(msg.getKeys());
    }
});

四、消息存儲的深度解析

4.1 存儲架構設計

CommitLog
├── 00000000000000000000
├── 00000000000001000000
└── 00000000000002000000

ConsumeQueue
├── TopicA
│   ├── 0
│   ├── 1
│   └── 2
└── TopicB
    ├── 0
    └── 1

4.2 高性能寫入實現

  1. 內存映射技術:通過MappedByteBuffer實現零拷貝
  2. 順序寫入:所有消息追加到CommitLog
  3. 頁緩存加速:Linux page cache異步刷盤
// 存儲配置示例(broker.conf)
flushDiskType = ASYNC_FLUSH
storePathCommitLog = /data/rocketmq/store/commitlog
mappedFileSizeCommitLog = 1073741824  # 1GB

4.3 過期文件清理

  • 默認保留72小時(fileReservedTime=72)
  • 定時任務刪除過期文件
  • 磁盤水位警戒線(diskMaxUsedSpaceRatio=75)

五、最佳實踐建議

  1. 順序消息

    • 避免單個隊列熱點問題
    • 設置合理的sendMsgTimeout(默認3秒)
  2. 事務消息

    • 本地事務檢查要實現冪等
    • 事務超時時間不超過checkInterval(默認1分鐘)
  3. 存儲優化

    • SSD硬盤提升IOPS
    • 監控CommitLog文件增長速率
    • 合理設置osPageCache大小

通過本文的示例分析,可以更深入地理解RocketMQ在消息可靠性、一致性方面的設計哲學,幫助開發者在實際業務中做出合理的技術選型和實現。 “`

注:全文約1150字,包含代碼示例、架構圖和實現原理說明,符合技術文章的專業性要求??筛鶕枰{整具體參數配置或補充更多異常處理場景。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女