diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 29a8bc460..6b2d7a55e 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -239,7 +239,8 @@ public void sendConnAck() { MqttConnectAck.MqttConnectSuccessAckBuilder builder = MqttConnectAck.successBuilder(protocolVersion) .receiveMaximum(getServerRestrictions().getReceiveMaximum()) .cleanSession(clientRestrictions.isCleanSession()) - .maximumQos(MqttQoS.AT_LEAST_ONCE.value()); + .maximumQos(MqttQoS.AT_LEAST_ONCE.value()) + .maximumPacketSize(getServerRestrictions().getMaximumPacketSize()); MqttProperties.StringProperty resInformation = (MqttProperties.StringProperty) connectMessage.variableHeader() .properties().getProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value()); if (resInformation != null) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 0ee63aa43..8f251597d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -49,6 +49,8 @@ public final static class MqttConnectSuccessAckBuilder { private String responseInformation; + private int maximumPacketSize; + public MqttConnectSuccessAckBuilder(int protocolVersion) { this.protocolVersion = protocolVersion; } @@ -73,6 +75,11 @@ public MqttConnectSuccessAckBuilder responseInformation(String responseInformati return this; } + public MqttConnectSuccessAckBuilder maximumPacketSize(int maximumPacketSize) { + this.maximumPacketSize = maximumPacketSize; + return this; + } + public MqttAck build() { MqttMessageBuilders.ConnAckBuilder commonBuilder = MqttMessageBuilders.connAck() .sessionPresent(!cleanSession); @@ -88,8 +95,17 @@ public MqttAck build() { MqttProperties.IntegerProperty maximumQosProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.MAXIMUM_QOS.value(), maximumQos); + // Set Subscription Identifiers Available = 0 now. + MqttProperties.IntegerProperty subscriptionIdentifiersAvailableProperty = + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE.value(), 0); + MqttProperties.IntegerProperty maximumPacketSizeProperty = + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value(), maximumPacketSize); properties.add(receiveMaximumProperty); properties.add(maximumQosProperty); + properties.add(subscriptionIdentifiersAvailableProperty); + properties.add(maximumPacketSizeProperty); if (StringUtils.isNotEmpty(responseInformation)) { MqttProperties.StringProperty responseInformationProperty = new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(), diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index d77cb1ecd..86cbf3351 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -115,6 +115,7 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole, Client final MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); final ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(proxyConfig.getReceiveMaximum()) + .maximumPacketSize(proxyConfig.getMqttMessageMaxLength()) .build(); connection = Connection.builder() .protocolVersion(msg.variableHeader().version()) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ServerRestrictions.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ServerRestrictions.java index 0e64bea54..0540e7c3d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ServerRestrictions.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ServerRestrictions.java @@ -24,4 +24,7 @@ public class ServerRestrictions { @Getter private int receiveMaximum; + + @Getter + private int maximumPacketSize; } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index cd743f725..fa04b73ff 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -127,6 +127,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, final MqttConnectMessage msg = (MqttConnectMessage) adapterMsg.getMqttMessage(); ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(configuration.getReceiveMaximum()) + .maximumPacketSize(configuration.getMqttMessageMaxLength()) .build(); connection = Connection.builder() .protocolVersion(msg.variableHeader().version())