# RocketMQ中順序消息、重復消息、事務消息和消息存儲的示例分析
## 一、順序消息的實現與示例
### 1.1 順序消息的核心機制
RocketMQ通過**隊列分區順序性**保證消息順序:
- 同一業務ID(如訂單ID)的消息會被分配到同一個MessageQueue
- 消費者通過MessageListenerOrderly接口順序消費
```java
// 生產者示例:指定業務Key保證同訂單消息進入同一隊列
Message msg = new Message("OrderTopic", "Order_12345", "訂單創建".getBytes());
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根據訂單ID選擇隊列
int index = Math.abs(arg.toString().hashCode()) % mqs.size();
return mqs.get(index);
}
}, "Order_12345");
// 消費者示例
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 保證順序處理
processOrderMessages(msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 使用Redis實現冪等校驗
public boolean checkMessageIdempotent(String msgId) {
String key = "msg:" + msgId;
// SETNX原子性操作
return redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
}
// 業務層去重表設計
CREATE TABLE message_dedup (
biz_id VARCHAR(64) PRIMARY KEY,
msg_id VARCHAR(64),
status TINYINT,
create_time DATETIME
);
@startuml
participant Producer
participant Broker
participant LocalDB
Producer -> Broker: 發送半消息(HALF_MESSAGE)
Broker --> Producer: 返回寫入成功
Producer -> LocalDB: 執行本地事務
alt 事務成功
Producer -> Broker: 提交事務(COMMIT_MESSAGE)
else 事務失敗
Producer -> Broker: 回滾事務(ROLLBACK_MESSAGE)
end
Broker -> Consumer: 投遞可消費消息
@enduml
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 執行本地數據庫事務
boolean success = orderService.createOrder((OrderDTO)arg);
return success ? LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 補償檢查邏輯
return orderService.checkOrderStatus(msg.getKeys());
}
});
CommitLog
├── 00000000000000000000
├── 00000000000001000000
└── 00000000000002000000
ConsumeQueue
├── TopicA
│ ├── 0
│ ├── 1
│ └── 2
└── TopicB
├── 0
└── 1
// 存儲配置示例(broker.conf)
flushDiskType = ASYNC_FLUSH
storePathCommitLog = /data/rocketmq/store/commitlog
mappedFileSizeCommitLog = 1073741824 # 1GB
順序消息:
事務消息:
存儲優化:
通過本文的示例分析,可以更深入地理解RocketMQ在消息可靠性、一致性方面的設計哲學,幫助開發者在實際業務中做出合理的技術選型和實現。 “`
注:全文約1150字,包含代碼示例、架構圖和實現原理說明,符合技術文章的專業性要求??筛鶕枰{整具體參數配置或補充更多異常處理場景。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。