# SpringMVC消費RabbitMQ隊列的示例分析
## 引言
在現代分布式系統架構中,消息隊列作為解耦生產者和消費者的重要組件,被廣泛應用于異步處理、應用解耦、流量削峰等場景。RabbitMQ作為實現了AMQP協議的開源消息代理,以其可靠性、靈活的路由機制和跨平臺特性成為企業級應用的首選。本文將深入探討如何在SpringMVC框架中集成RabbitMQ消費者,通過完整示例演示從環境搭建到消息處理的完整流程。
## 一、環境準備與依賴配置
### 1.1 RabbitMQ環境搭建
首先需要確保RabbitMQ服務已正確安裝并運行:
```bash
# Docker方式啟動RabbitMQ(推薦)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
訪問http://localhost:15672可進入管理控制臺(默認賬號/密碼:guest/guest)
在Maven項目的pom.xml中添加必要依賴:
<!-- Spring MVC基礎依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.18</version>
</dependency>
<!-- RabbitMQ集成依賴 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.4</version>
</dependency>
<!-- JSON處理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
創建RabbitMQ配置類RabbitMQConfig.java:
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
在配置類中繼續添加隊列和交換機的聲明:
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.routingKey");
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
創建訂單消息消費者OrderMessageConsumer.java:
@Component
public class OrderMessageConsumer {
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
System.out.println("Received order: " + order);
// 業務處理邏輯...
}
}
@RabbitListener(queues = "order.queue")
public void processOrderWithAck(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 業務處理
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true); // 重新入隊
}
}
@RabbitListener(queues = "batch.order.queue", containerFactory = "batchFactory")
public void processBatchOrders(List<Order> orders) {
orders.forEach(order -> {
// 批量處理邏輯
});
}
對應的批量容器工廠配置:
@Bean
public SimpleRabbitListenerContainerFactory batchFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true); // 啟用批量模式
factory.setBatchSize(50);
return factory;
}
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
// 記錄錯誤日志
// 發送告警通知
// 自定義恢復邏輯
}
}
在容器工廠中配置:
factory.setErrorHandler(new CustomErrorHandler());
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.letter.queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead.letter.routingKey");
}
@Bean
public Queue orderQueueWithDLQ() {
return QueueBuilder.durable("order.queue.with.dlq")
.withArgument("x-dead-letter-exchange", "dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter.routingKey")
.build();
}
# application.properties
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=20
@Bean
public RabbitListenerEndpointRegistry endpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Scheduled(fixedRate = 60000)
public void monitorQueueConsumers() {
endpointRegistry().getListenerContainers().forEach(container -> {
System.out.println("Queue: " + Arrays.toString(container.getQueueNames()));
System.out.println("Active consumers: " + container.getActiveConsumerCount());
});
}
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public ResponseEntity<String> createOrder(@RequestBody Order order) {
rabbitTemplate.convertAndSend("order.exchange",
"order.routingKey", order);
return ResponseEntity.ok("Order submitted");
}
}
@Service
public class OrderProcessingService {
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
// 1. 訂單驗證
validateOrder(order);
// 2. 庫存扣減
reduceInventory(order);
// 3. 支付處理
processPayment(order);
// 4. 物流通知
notifyLogistics(order);
}
// 各業務方法實現...
}
解決方案: - 實現冪等性處理 - 使用Redis記錄已處理消息ID - 數據庫唯一約束防止重復處理
@RabbitListener(queues = "order.queue")
public void processOrderWithIdempotent(Order order) {
if (redisTemplate.opsForValue().setIfAbsent(
"order:" + order.getId(), "processing", 24, TimeUnit.HOURS)) {
// 實際業務處理
}
}
解決方案: - 增加消費者實例 - 優化消費邏輯 - 設置合理的prefetch count - 啟用惰性隊列
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.withArgument("x-queue-mode", "lazy")
.build();
}
通過本文的示例,我們完整演示了SpringMVC集成RabbitMQ消費者的實現過程。關鍵實踐要點包括:
隨著業務規模擴大,可考慮引入Spring Cloud Stream等更高級的抽象層,進一步簡化消息驅動的微服務開發。
# 查看隊列狀態
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 查看消費者
rabbitmqctl list_consumers
”`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。