From 71bfa9f8d2fedaac80601af7fca518cfcf5b7acb Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 21 Jun 2022 18:41:18 +0800 Subject: [PATCH 1/3] Support MQTT-5 connect ack with Max Qos. --- .../pulsar/handlers/mqtt/Connection.java | 2 ++ .../mqtt/messages/ack/MqttConnectAck.java | 24 +++++++++++++++++-- ...AbstractCommonProtocolMethodProcessor.java | 2 +- .../pulsar/handlers/mqtt/utils/MqttUtils.java | 15 ++++++++++-- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index b0d03b222..8dc3fc669 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.timeout.IdleStateHandler; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidSessionExpireIntervalException; @@ -237,6 +238,7 @@ public void sendConnAck() { MqttAck connAck = MqttConnectAck.successBuilder(protocolVersion) .receiveMaximum(getServerRestrictions().getReceiveMaximum()) .cleanSession(clientRestrictions.isCleanSession()) + .maximumQos(MqttQoS.AT_LEAST_ONCE.value()) .build(); sendAck(connAck).thenAccept(__ -> assignState(CONNECT_ACK, ESTABLISHED)); if (log.isDebugEnabled()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 9eb5c1f0a..24749691c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -44,6 +44,8 @@ public final static class MqttConnectSuccessAckBuilder { private boolean cleanSession; private int receiveMaximum; + private int maximumQos; + public MqttConnectSuccessAckBuilder(int protocolVersion) { this.protocolVersion = protocolVersion; } @@ -58,6 +60,11 @@ public MqttConnectSuccessAckBuilder receiveMaximum(int receiveMaximum) { return this; } + public MqttConnectSuccessAckBuilder maximumQos(int maximumQos) { + this.maximumQos = maximumQos; + return this; + } + public MqttAck build() { MqttMessageBuilders.ConnAckBuilder commonBuilder = MqttMessageBuilders.connAck() .sessionPresent(!cleanSession); @@ -67,10 +74,14 @@ public MqttAck build() { .build()); } MqttProperties properties = new MqttProperties(); - MqttProperties.IntegerProperty property = + MqttProperties.IntegerProperty receiveMaximumProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(), receiveMaximum); - properties.add(property); + MqttProperties.IntegerProperty maximumQosProperty = + new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.MAXIMUM_QOS.value(), + maximumQos); + properties.add(receiveMaximumProperty); + properties.add(maximumQosProperty); return MqttAck.createSupportedAck( commonBuilder.returnCode(Mqtt5ConnReasonCode.SUCCESS.toConnectionReasonCode()) .properties(properties) @@ -107,6 +118,12 @@ public MqttMessage authFail(int protocolVersion) { return build().getMqttMessage(); } + public MqttMessage qosNotSupport(int protocolVersion) { + this.protocolVersion = protocolVersion; + this.errorReason = ErrorReason.QOS_NOT_SUPPORT; + return build().getMqttMessage(); + } + public MqttMessage willQosNotSupport(int protocolVersion) { this.protocolVersion = protocolVersion; this.errorReason = ErrorReason.WILL_QOS_NOT_SUPPORT; @@ -150,6 +167,9 @@ enum ErrorReason { UNSUPPORTED_VERSION(Mqtt3ConnReasonCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, Mqtt5ConnReasonCode.UNSUPPORTED_PROTOCOL_VERSION ), + QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, + Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), + WILL_QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), SERVER_UNAVAILABLE(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java index 94886a295..21d711c85 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java @@ -79,7 +79,7 @@ public void processConnect(MqttAdapterMessage adapter) { } return; } - if (!MqttUtils.isQosSupported(msg)) { + if (!MqttUtils.isWillQosSupported(msg)) { MqttMessage mqttMessage = MqttConnectAck.errorBuilder().willQosNotSupport(protocolVersion); adapter.setMqttMessage(mqttMessage); channel.writeAndFlush(adapter); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java index 6b61381d0..c33529516 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java @@ -43,8 +43,19 @@ public static boolean isNotMqtt3(int version) { return !isMqtt3(version); } public static boolean isQosSupported(MqttConnectMessage msg) { - int willQos = msg.variableHeader().willQos(); - MqttQoS mqttQoS = MqttQoS.valueOf(willQos); + return isQosSupported(msg.fixedHeader().qosLevel()); + } + + public static boolean isWillQosSupported(MqttConnectMessage msg) { + boolean willFlag = msg.variableHeader().isWillFlag(); + if (willFlag) { + return isQosSupported(MqttQoS.valueOf(msg.variableHeader().willQos())); + } else { + return true; + } + } + + private static boolean isQosSupported(MqttQoS mqttQoS) { return mqttQoS == MqttQoS.AT_LEAST_ONCE || mqttQoS == MqttQoS.AT_MOST_ONCE; } From 25ee0f9f7bc37428bf82070fc1312c8cb74a0d48 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 21 Jun 2022 19:03:36 +0800 Subject: [PATCH 2/3] updates. --- .../pulsar/handlers/mqtt/Connection.java | 12 +++++++++--- .../handlers/mqtt/messages/ack/MqttConnectAck.java | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 8dc3fc669..29a8bc460 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.timeout.IdleStateHandler; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; @@ -235,11 +236,16 @@ public void sendConnAck() { sendAckThenClose(connAck); return; } - MqttAck connAck = MqttConnectAck.successBuilder(protocolVersion) + MqttConnectAck.MqttConnectSuccessAckBuilder builder = MqttConnectAck.successBuilder(protocolVersion) .receiveMaximum(getServerRestrictions().getReceiveMaximum()) .cleanSession(clientRestrictions.isCleanSession()) - .maximumQos(MqttQoS.AT_LEAST_ONCE.value()) - .build(); + .maximumQos(MqttQoS.AT_LEAST_ONCE.value()); + MqttProperties.StringProperty resInformation = (MqttProperties.StringProperty) connectMessage.variableHeader() + .properties().getProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value()); + if (resInformation != null) { + builder.responseInformation(resInformation.value()); + } + MqttAck connAck = builder.build(); sendAck(connAck).thenAccept(__ -> assignState(CONNECT_ACK, ESTABLISHED)); if (log.isDebugEnabled()) { log.debug("The CONNECT message has been processed. CId={}", clientId); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 24749691c..342a85bd4 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -21,6 +21,7 @@ import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt3.Mqtt3ConnReasonCode; import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5ConnReasonCode; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; +import org.apache.commons.lang3.StringUtils; /** * Enhance mqtt connect ack message builder. @@ -46,6 +47,8 @@ public final static class MqttConnectSuccessAckBuilder { private int maximumQos; + private String responseInformation; + public MqttConnectSuccessAckBuilder(int protocolVersion) { this.protocolVersion = protocolVersion; } @@ -65,6 +68,11 @@ public MqttConnectSuccessAckBuilder maximumQos(int maximumQos) { return this; } + public MqttConnectSuccessAckBuilder responseInformation(String responseInformation) { + this.responseInformation = responseInformation; + return this; + } + public MqttAck build() { MqttMessageBuilders.ConnAckBuilder commonBuilder = MqttMessageBuilders.connAck() .sessionPresent(!cleanSession); @@ -82,6 +90,12 @@ public MqttAck build() { maximumQos); properties.add(receiveMaximumProperty); properties.add(maximumQosProperty); + if (StringUtils.isEmpty(responseInformation)) { + MqttProperties.StringProperty responseInformationProperty = + new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(), + responseInformation); + properties.add(responseInformationProperty); + } return MqttAck.createSupportedAck( commonBuilder.returnCode(Mqtt5ConnReasonCode.SUCCESS.toConnectionReasonCode()) .properties(properties) From df47dd3780ef2aa16a75b5b5a6262cd279246818 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 21 Jun 2022 20:33:30 +0800 Subject: [PATCH 3/3] fix test. --- .../pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java | 2 +- .../mqtt/support/MQTTBrokerProtocolMethodProcessor.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 342a85bd4..0ee63aa43 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -90,7 +90,7 @@ public MqttAck build() { maximumQos); properties.add(receiveMaximumProperty); properties.add(maximumQosProperty); - if (StringUtils.isEmpty(responseInformation)) { + if (StringUtils.isNotEmpty(responseInformation)) { MqttProperties.StringProperty responseInformationProperty = new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(), responseInformation); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 8e6680863..cd743f725 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -136,6 +136,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, .clientRestrictions(clientRestrictions) .serverRestrictions(serverRestrictions) .channel(channel) + .connectMessage(msg) .connectionManager(connectionManager) .fromProxy(adapterMsg.fromProxy()) .processor(this)