Troubleshooting MQTT Protocol Issues with Large Messages on Netty 4.1.32 in Android Chat Servers

Background

A chat server was set up within the group using the MQTT protocol. The day before yesterday, during the test for large messages (exceeding 5000 Chinese characters), the connection became unusable, and no subsequent messages received any response.

Server environment: Netty: 4.1.32.Final Using the MqttDecoder built into the Netty package

Client: Android

Investigation Process

  1. Since all the messages were logged, we first searched the server logs and found that there was no content of the sent messages in the logs.
  2. Could it be that the client did not send the message when it was too long? We used tcpdump to capture the packets and found that the client was sending normally, and the server had already acknowledged all the packets, but the server did not send back any responses, leading us to suspect that the server failed to process large messages.
    1. Using tcpdump with the -nn option to print IP addresses and ports, and -X to print the contents of the network packets, you can also use the -w option to save it to a file and then analyze it with tcpdump or wireshark
  3. Then, we checked the maximum payload supported by MQTT. The official MQTT documentation states it is 256M, which the message size definitely does not exceed.
  4. We captured another packet on the server and confirmed the message was received but no acknowledgment was sent back.
  5. Upon enabling online debugging, we found a PUBLISH type message was received, but the message’s class was not MqttPublishMessage, and the payload was empty, yet there was an error message in the Message: too large message: 56234 bytes
  6. Upon Googling, we found someone encountered the same issue, even though MQTT was using C language in this issue.
  7. We inspected MqttDecoder and discovered that the decoder has a maximum payload limit (part of the code is below), and the launch code called the default constructor, so the maximum data by default is 8092 bytes.

JavaScript code excerpt:

public final class MqttDecoder extends ReplayingDecoder {    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;    public MqttDecoder() {      this(DEFAULT_MAX_BYTES_IN_MESSAGE);    }    public MqttDecoder(int maxBytesInMessage) {        super(DecoderState.READ_FIXED_HEADER);        this.maxBytesInMessage = maxBytesInMessage;    }    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception {        switch (state()) {            case READ_FIXED_HEADER: try {                mqttFixedHeader = decodeFixedHeader(buffer);                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();                checkpoint(DecoderState.READ_VARIABLE_HEADER);                // fall through            } catch (Exception cause) {                out.add(invalidMessage(cause));                return;            }            case READ_VARIABLE_HEADER:  try {                final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);                variableHeader = decodedVariableHeader.value;                if (bytesRemainingInVariablePart > maxBytesInMessage) {                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");                }                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;                checkpoint(DecoderState.READ_PAYLOAD);                // fall through            } catch (Exception cause) {                out.add(invalidMessage(cause));                return;            }            case READ_PAYLOAD: try {                final Result<?> decodedPayload =                        decodePayload(                                buffer,                                mqttFixedHeader.messageType(),                                bytesRemainingInVariablePart,                                variableHeader);                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;                if (bytesRemainingInVariablePart != 0) {                    throw new DecoderException(                            "non-zero remaining payload bytes: " +                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');                }                checkpoint(DecoderState.READ_FIXED_HEADER);                MqttMessage message = MqttMessageFactory.newMessage(                        mqttFixedHeader, variableHeader, decodedPayload.value);                mqttFixedHeader = null;                variableHeader = null;                out.add(message);                break;            } catch (Exception cause) {                out.add(invalidMessage(cause));                return;            }            case BAD_MESSAGE:                // Keep discarding until disconnection.                buffer.skipBytes(actualReadableBytes());                break;            default:                // Shouldn't reach here.                throw new Error();        }    }    private MqttMessage invalidMessage(Throwable cause) {      checkpoint(DecoderState.BAD_MESSAGE);      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);    }}
  1. Now that we found the cause of the long message, one remaining issue is why subsequent messages, including ping messages, couldn't be sent anymore. Upon reviewing the code, this is related to the parent class ReplayingDecoder of MqttDecoder. A detailed class explanation exists in the source, and when reading a variable-length header, if the payload exceeds the maximum limit, an exception is directly thrown. Code excerpt as follows:

JavaScript code excerpt:

case READ_VARIABLE_HEADER:  try {    final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);    variableHeader = decodedVariableHeader.value;    if (bytesRemainingInVariablePart > maxBytesInMessage) {        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");    }    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;    checkpoint(DecoderState.READ_PAYLOAD);    // fall through} catch (Exception cause) {    out.add(invalidMessage(cause));    return;}

In the exception handling, the invalidMessage method is called, setting the state to DecoderState.BAD_MESSAGE, in which state all bytes are directly discarded.

JavaScript code excerpt:

case BAD_MESSAGE:    // Keep discarding until disconnection.    buffer.skipBytes(actualReadableBytes());    break;

This means that subsequent messages won’t enter the business logic, rendering the long connection useless.

Solution

  1. Implement a word limit and message split on the client-side to ensure that a single message does not exceed the maximum limit.
  2. Increase the maximum payload length on the server-side; MqttDecoder provides a constructor for this (though it is not recommended, as it would increase server processing time and memory burden).

This article is part of the Tencent Cloud Self-Media Sync Exposure Plan, shared from the author's personal site/blog. If there is any infringement, please contact [email protected] for removal. Visit for more information on mqtt, linux, security, java

https://cloud.tencent.com/developer/article/1473438