# Apache Pulsar是如何保證消息不丟不重
## 引言
在分布式消息系統中,"消息不丟失"和"消息不重復"(Exactly-Once語義)是兩大核心挑戰。Apache Pulsar作為新一代云原生消息流平臺,通過多層設計實現了高可靠的消息傳遞。本文將深入剖析Pulsar如何從存儲機制、確認機制、副本策略等方面保障消息投遞的可靠性。
---
## 一、架構基礎:分層存儲與多副本
### 1.1 BookKeeper的持久化保障
Pulsar采用BookKeeper作為持久化存儲引擎,其核心特性包括:
- **預寫日志(WAL)**:所有消息先寫入不可變的日志文件
- **多副本同步寫入**:每條消息需被`ack-quorum`個副本持久化后才返回成功
```java
// 偽代碼:BookKeeper寫入流程
Entry entry = new Entry(message);
ledger.addEntry(entry).thenRun(() -> {
// 至少寫入2個節點(默認quorum=2)
if(confirmedReplicas >= ackQuorum) {
sendAckToProducer();
}
});
配置參數 | 作用 | 推薦值 |
---|---|---|
sendTimeout |
發送超時時間 | 30s |
blockIfQueueFull |
內存隊列滿時阻塞而非丟棄 | true |
maxPendingMessages |
最大待確認消息數 | 1000 |
# Python生產者示例
producer = client.create_producer(
topic='persistent://tenant/ns/topic',
send_timeout_millis=30000,
block_if_queue_full=True
)
ensembleSize=3, writeQuorum=2, ackQuorum=2
實現
ackTimeout
(默認30s)后重投maxRedeliverCount
的消息轉入DLQ# 啟用生產者冪等
enableIdempotence=true
producerName=order-producer-1
sequenceId=142857
sequenceId
// 事務使用示例
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES)
.build();
producer.newMessage(txn).value("訂單創建".getBytes()).send();
consumer.acknowledgeAsync(msg.getMessageId(), txn);
txn.commit().exceptionally(ex -> {
log.error("事務失敗", ex);
return null;
});
sequenceId
緩存brokerDeduplicationEnabled: true
brokerDeduplicationMaxNumberOfProducers: 10000
brokerDeduplicationEntriesInterval: 1000
// Go客戶端指數退避示例
retryPolicy := pulsar.NewRetryPolicy(
pulsar.MaxReconnectToBroker(5),
pulsar.Backoff(100*time.Millisecond, 5*time.Second),
)
client, _ := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
RetryPolicy: retryPolicy,
})
配置項 | 性能影響 | 可靠性影響 |
---|---|---|
ackQuorum=1 |
↑↑↑ | ↓↓↓ |
immediateFlush=false |
↑↑ | ↓ |
compactionThreshold |
↓ | ↑↑ |
關鍵Prometheus指標:
- pulsar_storage_write_latency_le_200
:寫入延遲
- pulsar_consumer_msg_ack_rate
:ACK速率
- bookkeeper_ledger_count
:分片數量
# broker.conf
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
acknowledgmentAtBatchIndexLevelEnabled=true
Apache Pulsar通過BookKeeper的可靠存儲、多級確認機制、冪等設計和事務支持,構建了完整的消息可靠性保障體系。在實際應用中,需要根據業務場景在可靠性和性能之間找到平衡點。隨著2.10版本引入的改進的持久性策略,Pulsar在消息可靠性方面繼續領跑分布式消息中間件領域。 “`
(注:實際字數為1580字,可根據需要擴展具體章節細節。文中技術參數基于Pulsar 2.10版本,實際使用時請參考對應版本文檔。)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。