在CentOS上處理RabbitMQ消息延遲,推薦使用延遲插件或TTL+死信隊列方案,以下是具體步驟:
rabbitmq-delayed-message-exchange插件(如RabbitMQ 3.9.x對應插件版本3.9.0)。.ez格式)復制到RabbitMQ安裝目錄的plugins目錄下。rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl restart rabbitmq-server
代碼示例(Spring Boot):
// 聲明延遲交換機(類型為x-delayed-message)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底層路由模式
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
// 聲明延遲隊列并綁定交換機
@Bean
public Queue delayedQueue() {
return new Queue("delayed_queue", true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key");
}
發送延遲消息:
rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", "消息內容",
message -> {
message.getMessageProperties().setDelay(5000); // 設置延遲5秒(單位:毫秒)
return message;
});
消費者監聽:
@RabbitListener(queues = "delayed_queue")
public void handleMessage(String message) {
System.out.println("處理延遲消息: " + message);
}
創建普通隊列,設置消息TTL(存活時間)和死信交換機:
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("ttl_queue")
.withArgument("x-message-ttl", 10000) // 10秒TTL
.withArgument("x-dead-letter-exchange", "dlx_exchange") // 死信交換機
.withArgument("x-dead-letter-routing-key", "delayed_queue") // 死信路由鍵
.build();
}
// 聲明死信隊列
@Bean
public Queue delayedQueue() {
return new Queue("delayed_queue", true);
}
發送消息:直接發送到普通隊列,消息到期后自動轉入死信隊列被消費。
x-max-length限制隊列長度。tail -f /var/log/rabbitmq/rabbit@hostname.log
rabbitmqctl list_queues查看隊列消息堆積情況,確認延遲是否符合預期。RabbitMQ Management UI(端口15672)實時監控隊列狀態和消息流動。以上方案可有效解決CentOS下RabbitMQ的消息延遲問題,優先選擇插件方案以獲得更高的靈活性和性能。