這篇文章將為大家詳細講解有關MQTT大消息失敗原因排查的過程,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
小組內使用 MQTT 協議搭建了一個聊天服務器,前天在測大消息(超過5000漢字)時,連接直接變得不可用,后續發送的消息全部都收不到回復。
服務器環境:
Netty :4.1.32.Final
使用的是 Netty 包中自帶的 MqttDecoder
客戶端: Android
由于所有的消息都打印了日志,因此先搜了一下服務器日志,發現日志中并沒有發送的消息內容。
難道是客戶端在超長消息時沒有發送?使用 tcpdump 抓了包,發現客戶端正常發送,并且所有的包服務端都已經 ack,但是后續服務端沒有發回響應,猜測是服務端在大消息的情況下處理失敗了。
tcpdump 使用 -nn 打印出ip和端口,-X 打印網絡包的內容,也可以使用-w 選項保存到文件里,然后使用 tcpdump 或 wireshark 來分析
于是查了一下 MQTT 支持的最大 payload,MQTT 官方文檔 中說明是 256M,這個大小肯定不會超過。
在服務端抓了下包,確認消息已經收到,但是無確認消息返回
開啟線上debug,發現收到了一個 PUBLISH 類型的消息,但是消息的 class 不為 MqttPublishMessage, 且 payload 中無數據,但在 Message 中有一個報錯消息 too large message: 56234 bytes
Google 一下,有網友遇到了同樣的問題, 雖然這個問題里 MQTT 是 C 語言的。
查看 MqttDecoder, 發現 decoder 有最長 payload 限制(以下為部分代碼),啟動代碼里調用的是默認構造函數,因此默認最長數據為 8092 字節。
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
public MqttDecoder() {
this(DEFAULT_MAX_BYTES_IN_MESSAGE);
}
public MqttDecoder(int maxBytesInMessage) {
super(DecoderState.READ_FIXED_HEADER);
this.maxBytesInMessage = maxBytesInMessage;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch (state()) {
case READ_FIXED_HEADER: try {
mqttFixedHeader = decodeFixedHeader(buffer);
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
checkpoint(DecoderState.READ_VARIABLE_HEADER);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case READ_PAYLOAD: try {
final Result<?> decodedPayload =
decodePayload(
buffer,
mqttFixedHeader.messageType(),
bytesRemainingInVariablePart,
variableHeader);
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
if (bytesRemainingInVariablePart != 0) {
throw new DecoderException(
"non-zero remaining payload bytes: " +
bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
}
checkpoint(DecoderState.READ_FIXED_HEADER);
MqttMessage message = MqttMessageFactory.newMessage(
mqttFixedHeader, variableHeader, decodedPayload.value);
mqttFixedHeader = null;
variableHeader = null;
out.add(message);
break;
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case BAD_MESSAGE:
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
break;
default:
// Shouldn't reach here.
throw new Error();
}
}
private MqttMessage invalidMessage(Throwable cause) {
checkpoint(DecoderState.BAD_MESSAGE);
return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
}
}長消息的原因找到了,還剩一個問題,為什么后續的消息包括 ping 消息就再也發不出去了?經過查看代碼,這與 MqttDecoder 的父類 ReplayingDecoder 有關系,查看源碼有詳盡的類說明, 在讀取可變長度頭部時,如果payload 超過了最大限制,那么直接拋出異常。摘出代碼如下:
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}在異常處理中,調用了 invalidMessage 方法,這個方法將 狀態設為 DecoderState.BAD_MESSAGE, 在這個狀態下,所有的字節都直接被丟棄。
case BAD_MESSAGE: // Keep discarding until disconnection. buffer.skipBytes(actualReadableBytes()); break;
也就是說此后的消息都不會進入到業務處理邏輯,這條長連接廢掉了。
客戶端對長消息做字數限制和拆分,保證單條消息不超過最大限制
服務端增大最大載荷長度,MqttDecoder 提供了構造函數(不建議使用,這樣會增大服務器處理時間和內存負擔)
關于MQTT大消息失敗原因排查的過程就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。