diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index b0d03b222..29a8bc460 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -24,6 +24,8 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.timeout.IdleStateHandler; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidSessionExpireIntervalException; @@ -234,10 +236,16 @@ public void sendConnAck() { sendAckThenClose(connAck); return; } - MqttAck connAck = MqttConnectAck.successBuilder(protocolVersion) + MqttConnectAck.MqttConnectSuccessAckBuilder builder = MqttConnectAck.successBuilder(protocolVersion) .receiveMaximum(getServerRestrictions().getReceiveMaximum()) .cleanSession(clientRestrictions.isCleanSession()) - .build(); + .maximumQos(MqttQoS.AT_LEAST_ONCE.value()); + MqttProperties.StringProperty resInformation = (MqttProperties.StringProperty) connectMessage.variableHeader() + .properties().getProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value()); + if (resInformation != null) { + builder.responseInformation(resInformation.value()); + } + MqttAck connAck = builder.build(); sendAck(connAck).thenAccept(__ -> assignState(CONNECT_ACK, ESTABLISHED)); if (log.isDebugEnabled()) { log.debug("The CONNECT message has been processed. CId={}", clientId); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 9eb5c1f0a..0ee63aa43 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -21,6 +21,7 @@ import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt3.Mqtt3ConnReasonCode; import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5ConnReasonCode; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; +import org.apache.commons.lang3.StringUtils; /** * Enhance mqtt connect ack message builder. @@ -44,6 +45,10 @@ public final static class MqttConnectSuccessAckBuilder { private boolean cleanSession; private int receiveMaximum; + private int maximumQos; + + private String responseInformation; + public MqttConnectSuccessAckBuilder(int protocolVersion) { this.protocolVersion = protocolVersion; } @@ -58,6 +63,16 @@ public MqttConnectSuccessAckBuilder receiveMaximum(int receiveMaximum) { return this; } + public MqttConnectSuccessAckBuilder maximumQos(int maximumQos) { + this.maximumQos = maximumQos; + return this; + } + + public MqttConnectSuccessAckBuilder responseInformation(String responseInformation) { + this.responseInformation = responseInformation; + return this; + } + public MqttAck build() { MqttMessageBuilders.ConnAckBuilder commonBuilder = MqttMessageBuilders.connAck() .sessionPresent(!cleanSession); @@ -67,10 +82,20 @@ public MqttAck build() { .build()); } MqttProperties properties = new MqttProperties(); - MqttProperties.IntegerProperty property = + MqttProperties.IntegerProperty receiveMaximumProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(), receiveMaximum); - properties.add(property); + MqttProperties.IntegerProperty maximumQosProperty = + new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.MAXIMUM_QOS.value(), + maximumQos); + properties.add(receiveMaximumProperty); + properties.add(maximumQosProperty); + if (StringUtils.isNotEmpty(responseInformation)) { + MqttProperties.StringProperty responseInformationProperty = + new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(), + responseInformation); + properties.add(responseInformationProperty); + } return MqttAck.createSupportedAck( commonBuilder.returnCode(Mqtt5ConnReasonCode.SUCCESS.toConnectionReasonCode()) .properties(properties) @@ -107,6 +132,12 @@ public MqttMessage authFail(int protocolVersion) { return build().getMqttMessage(); } + public MqttMessage qosNotSupport(int protocolVersion) { + this.protocolVersion = protocolVersion; + this.errorReason = ErrorReason.QOS_NOT_SUPPORT; + return build().getMqttMessage(); + } + public MqttMessage willQosNotSupport(int protocolVersion) { this.protocolVersion = protocolVersion; this.errorReason = ErrorReason.WILL_QOS_NOT_SUPPORT; @@ -150,6 +181,9 @@ enum ErrorReason { UNSUPPORTED_VERSION(Mqtt3ConnReasonCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, Mqtt5ConnReasonCode.UNSUPPORTED_PROTOCOL_VERSION ), + QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, + Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), + WILL_QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), SERVER_UNAVAILABLE(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java index 94886a295..21d711c85 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java @@ -79,7 +79,7 @@ public void processConnect(MqttAdapterMessage adapter) { } return; } - if (!MqttUtils.isQosSupported(msg)) { + if (!MqttUtils.isWillQosSupported(msg)) { MqttMessage mqttMessage = MqttConnectAck.errorBuilder().willQosNotSupport(protocolVersion); adapter.setMqttMessage(mqttMessage); channel.writeAndFlush(adapter); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 8e6680863..cd743f725 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -136,6 +136,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, .clientRestrictions(clientRestrictions) .serverRestrictions(serverRestrictions) .channel(channel) + .connectMessage(msg) .connectionManager(connectionManager) .fromProxy(adapterMsg.fromProxy()) .processor(this) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java index 6b61381d0..c33529516 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java @@ -43,8 +43,19 @@ public static boolean isNotMqtt3(int version) { return !isMqtt3(version); } public static boolean isQosSupported(MqttConnectMessage msg) { - int willQos = msg.variableHeader().willQos(); - MqttQoS mqttQoS = MqttQoS.valueOf(willQos); + return isQosSupported(msg.fixedHeader().qosLevel()); + } + + public static boolean isWillQosSupported(MqttConnectMessage msg) { + boolean willFlag = msg.variableHeader().isWillFlag(); + if (willFlag) { + return isQosSupported(MqttQoS.valueOf(msg.variableHeader().willQos())); + } else { + return true; + } + } + + private static boolean isQosSupported(MqttQoS mqttQoS) { return mqttQoS == MqttQoS.AT_LEAST_ONCE || mqttQoS == MqttQoS.AT_MOST_ONCE; }