這篇文章主要介紹“MQ中怎么保證消息不被重復消費”,在日常操作中,相信很多人在MQ中怎么保證消息不被重復消費問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MQ中怎么保證消息不被重復消費”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
一. 重復消息
為什么會出現消息重復?消息重復的原因有兩個:1.生產時消息重復,2.消費時消息重復。
1.1 生產時消息重復
由于生產者發送消息給MQ,在MQ確認的時候出現了網絡波動,生產者沒有收到確認,實際上MQ已經接收到了消息。這時候生產者就會重新發送一遍這條消息。
生產者中如果消息未被確認,或確認失敗,我們可以使用定時任務+(redis/db)來進行消息重試。
@Component
@Slf4J
public
class SendMessage {
@Autowired
private MessageService messageService;
@Autowired
private RabbitTemplate rabbitTemplate;
// 最大投遞次數
private static final int MAX_TRY_COUNT =
3;
/**
* 每30s拉取投遞失敗的消息, 重新投遞
*/
@Scheduled(cron =
"0/30 * * * * ?")
public void resend() {
log.info("開始執行定時任務(重新投遞消息)");
List<MsgLog> msgLogs = messageService.selectTimeoutMsg();
msgLogs.forEach(msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超過最大重試次數, 消息投遞失敗, msgId: {}", msgId);
}
else {
messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數+1
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投遞
log.info("第 " + (msgLog.getTryCount() +
1) +
" 次重新投遞消息");
}
});
log.info("定時任務執行結束(重新投遞消息)");
}
}
1.2 消費時消息重復
消費者消費成功后,再給MQ確認的時候出現了網絡波動,MQ沒有接收到確認,為了保證消息被消費,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。
修改消費者,模擬異常
@RabbitListener(queuesToDeclare =
@Queue(value =
"javatrip", durable =
"true"))
public void receive(String message,
@Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("重試"+System.currentTimeMillis());
System.out.println(message);
int i =
1 /
0;
}
配置yml重試策略
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者進行重試
max-attempts:
5 # 最大重試次數
initial-interval:
3000 # 重試時間間隔
由于重復消息是由于網絡原因造成的,因此不可避免重復消息。但是我們需要保證消息的冪等性。
二. 如何保證消息冪等性
讓每個消息攜帶一個全局的唯一ID,即可保證消息的冪等性,具體消費過程為:
消費者獲取到消息后先根據id去查詢redis/db是否存在該消息
如果不存在,則正常消費,消費完畢后寫入redis/db
如果存在,則證明消息被消費過,直接丟棄。
生產者
@PostMapping("/send")
public void sendMessage(){
JSONObject jsonObject =
new JSONObject();
jsonObject.put("message","Java旅途");
String json = jsonObject.toJSONString();
Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
amqpTemplate.convertAndSend("javatrip",message);
}
消費者
@Component
@RabbitListener(queuesToDeclare =
@Queue(value =
"javatrip", durable =
"true"))
public class Consumer {
@RabbitHandler
public void receiveMessage(Message message) throws Exception {
Jedis jedis =
new Jedis("localhost",
6379);
String messageId = message.getMessageProperties().getMessageId();
String msg =
new String(message.getBody(),"UTF-8");
System.out.println("接收到的消息為:"+msg+"==消息id為:"+messageId);
String messageIdRedis = jedis.get("messageId");
if(messageId == messageIdRedis){
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("message");
jedis.set("messageId",messageId);
}
}
如果需要存入db的話,可以直接將這個ID設為消息的主鍵,下次如果獲取到重復消息進行消費時,由于數據庫主鍵的唯一性,則會直接拋出異常。
到此,關于“MQ中怎么保證消息不被重復消費”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。