From de5e48a88e3e4d8ab90f260ecefbb53805c9b014 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 5 Jun 2022 21:25:01 +0800 Subject: [PATCH 1/4] Support Request problem information --- .../mqtt/messages/MqttPropertyUtils.java | 30 +++----- .../mqtt/messages/ack/MqttPubAck.java | 4 +- .../MQTTProxyProtocolMethodProcessor.java | 26 ++++--- .../mqtt/restrictions/ClientRestrictions.java | 3 + .../MQTTBrokerProtocolMethodProcessor.java | 69 ++++++++++--------- .../mqtt/support/Qos1PublishHandler.java | 22 +++--- ...T5AuthorizationReasonCodeOnAllAckTest.java | 33 +++++++++ 7 files changed, 115 insertions(+), 72 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index 122081bd6..b5f74b1d3 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -32,6 +32,7 @@ */ @Slf4j public class MqttPropertyUtils { + private static int TRUE = 1; /** * Get session expire interval. @@ -48,24 +49,9 @@ public static Optional getExpireInterval(MqttProperties properties) { return Optional.ofNullable(property.value()); } - /** - * Get receive maximum. - * @param properties - mqtt properties - * @return Integer - expire interval value - */ @SuppressWarnings("unchecked") - private static Optional getReceiveMaximum(MqttProperties properties) { - MqttProperties.MqttProperty property = properties - .getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value()); - if (property == null) { - return Optional.empty(); - } - return Optional.ofNullable(property.value()); - } - - public static Optional getMaximumPacketSize(MqttProperties properties) { - MqttProperties.MqttProperty property = properties - .getProperty(MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value()); + public static Optional getIntProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) { + MqttProperties.MqttProperty property = properties.getProperty(type.value()); if (property == null) { return Optional.empty(); } @@ -81,15 +67,21 @@ public static void parsePropertiesToStuffRestriction( getExpireInterval(properties) .ifPresent(clientRestrictionsBuilder::sessionExpireInterval); // parse receive maximum - Optional receiveMaximum = getReceiveMaximum(properties); + Optional receiveMaximum = getIntProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM); if (receiveMaximum.isPresent() && receiveMaximum.get() == 0) { throw new InvalidReceiveMaximumException("Not Allow Receive maximum property value zero"); } else { receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum); } // parse maximum packet size - Optional maximumPacketSize = getMaximumPacketSize(properties); + Optional maximumPacketSize = getIntProperty(properties, MqttProperties + .MqttPropertyType.MAXIMUM_PACKET_SIZE); maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize); + // parse request problem information + Optional requestProblemInformation = + getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); + // the empty option means allowing reason string or user property + clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent()); } /** diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java index 14939d6c3..bcd8ce7c8 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java @@ -109,7 +109,9 @@ private MqttProperties getProperties() { return MqttProperties.NO_PROPERTIES; } MqttProperties properties = new MqttProperties(); - MqttPropertyUtils.setReasonString(properties, reasonString); + if (reasonString != null) { + MqttPropertyUtils.setReasonString(properties, reasonString); + } return properties; } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index 523f2114e..2ab24a5a1 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -156,13 +156,15 @@ public void processPublish(MqttAdapterMessage adapter) { Throwable cause = ex.getCause(); log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}", msg.variableHeader().topicName(), connection.getClientId(), cause); - MqttAck pubAck = MqttPubAck.errorBuilder(connection.getProtocolVersion()) + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(connection.getProtocolVersion()) .packetId(packetId) - .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR) - .reasonString(String.format("Failed to publish for topic, because of look up error %s", - cause.getMessage())) - .build(); - connection.sendAckThenClose(pubAck) + .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString( + String.format("Failed to publish for topic, because of look up error %s", + cause.getMessage())); + } + connection.sendAckThenClose(pubAckBuilder.build()) .thenAccept(__ -> connection.decrementServerReceivePubMessage()); } }); @@ -273,12 +275,14 @@ public void processSubscribe(final MqttAdapterMessage adapter) { .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[Proxy Subscribe] Failed to process subscribe for {}", clientId, realCause); - MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = + MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(packetId) - .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR) - .reasonString("[ MOP ERROR ]" + realCause.getMessage()) - .build(); - connection.sendAckThenClose(subAck); + .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + subAckBuilder.reasonString("[ MOP ERROR ]" + realCause.getMessage()); + } + connection.sendAckThenClose(subAckBuilder.build()); subscribeAckTracker.remove(packetId); return null; }); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java index 8c836d16d..c3f4fe982 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java @@ -35,6 +35,9 @@ public class ClientRestrictions { private boolean cleanSession; private Integer maximumPacketSize; + @Getter + private boolean allowReasonStrOrUserProperty; + public int getSessionExpireInterval() { return Optional.ofNullable(sessionExpireInterval) .orElse(SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime()); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index adc5a2385..8432cddb0 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -194,13 +194,14 @@ private CompletableFuture doUnauthorized(MqttAdapterMessage adapter) { log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}", msg.variableHeader().topicName(), connection.getUserRole(), connection.getClientId()); - MqttAck pubAck = MqttPubAck + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck .errorBuilder(connection.getProtocolVersion()) .packetId(msg.variableHeader().packetId()) - .reasonCode(Mqtt5PubReasonCode.NOT_AUTHORIZED) - .reasonString("Not Authorized!") - .build(); - connection.sendAckThenClose(pubAck); + .reasonCode(Mqtt5PubReasonCode.NOT_AUTHORIZED); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString("Not Authorized!"); + } + connection.sendAckThenClose(pubAckBuilder.build()); return CompletableFuture.completedFuture(null); } @@ -233,13 +234,14 @@ private void checkServerReceivePubMessageAndIncrementCounterIfNeeded(MqttAdapter if (connection.getServerReceivePubMessage() >= connection.getClientRestrictions().getReceiveMaximum()) { log.warn("Client publish exceed server receive maximum , the receive maximum is {}", connection.getServerRestrictions().getReceiveMaximum()); - MqttAck pubAck = MqttPubAck.errorBuilder(connection.getProtocolVersion()) + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(connection.getProtocolVersion()) .reasonCode(Mqtt5PubReasonCode.QUOTA_EXCEEDED) - .packetId(msg.variableHeader().packetId()) - .reasonString(String.format("Publish exceed server receive maximum %s.", - connection.getServerRestrictions().getReceiveMaximum())) - .build(); - connection.sendAckThenClose(pubAck); + .packetId(msg.variableHeader().packetId()); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(String.format("Publish exceed server receive maximum %s.", + connection.getServerRestrictions().getReceiveMaximum())); + } + connection.sendAckThenClose(pubAckBuilder.build()); } else { connection.incrementServerReceivePubMessage(); } @@ -360,12 +362,13 @@ private CompletableFuture doSubscribe(MqttSubscribeMessage msg) { final List subTopics = topicSubscriptions(msg); boolean duplicated = mqttSubscriptionManager.addSubscriptions(connection.getClientId(), subTopics); if (duplicated) { - MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(messageID) - .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR) - .reasonString("Duplicated subscribe") - .build(); - connection.sendAckThenClose(subAck); + .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + subAckBuilder.reasonString("Duplicated subscribe"); + } + connection.sendAckThenClose(subAckBuilder.build()); return CompletableFuture.completedFuture(null); } List> futureList = new ArrayList<>(subTopics.size()); @@ -416,20 +419,22 @@ private CompletableFuture doSubscribe(MqttSubscribeMessage msg) { subTopics.stream().map(MqttTopicSubscription::topicName) .collect(Collectors.joining(",")), pulsarService.getConfig().isAllowAutoTopicCreation()); - MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(messageID) - .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR) - .reasonString("Topic not found") - .build(); - connection.sendAckThenClose(subAck); + .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + subAckBuilder.reasonString("Topic not found"); + } + connection.sendAckThenClose(subAckBuilder.build()); } else { log.error("[Subscribe] [{}] Failed to process MQTT subscribe.", connection.getClientId(), ex); - MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(messageID) - .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR) - .reasonString("[ MOP ERROR ]" + realCause.getMessage()) - .build(); - connection.sendAckThenClose(subAck); + .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + subAckBuilder.reasonString("[ MOP ERROR ]" + realCause.getMessage()); + } + connection.sendAckThenClose(subAckBuilder.build()); } return null; }); @@ -524,12 +529,14 @@ public void processUnSubscribe(MqttAdapterMessage adapter) { .build(); connection.sendAck(unsubAck); } else { - MqttAck unsubAck = MqttUnsubAck.errorBuilder(connection.getProtocolVersion()) + MqttUnsubAck.MqttUnsubErrorAckBuilder unsubAckBuilder + = MqttUnsubAck.errorBuilder(connection.getProtocolVersion()) .packetId(packetId) - .reasonCode(Mqtt5UnsubReasonCode.UNSPECIFIED_ERROR) - .reasonString(cause.getMessage()) - .build(); - connection.sendAckThenClose(unsubAck); + .reasonCode(Mqtt5UnsubReasonCode.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + unsubAckBuilder.reasonString(cause.getMessage()); + } + connection.sendAckThenClose(unsubAckBuilder.build()); } return null; }); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index 89f5396e6..6340c1d75 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -84,20 +84,22 @@ public CompletableFuture publish(MqttAdapterMessage adapter) { } else if (realCause instanceof BrokerServiceException.TopicNotFoundException) { log.warn("Topic [{}] Not found, the configuration [isAllowAutoTopicCreation={}]", topic, pulsarService.getConfig().isAllowAutoTopicCreation()); - MqttAck pubAck = MqttPubAck.errorBuilder(protocolVersion) + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) .packetId(packetId) - .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR) - .reasonString("Topic not found") - .build(); - connection.sendAckThenClose(pubAck); + .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString("Topic not found"); + } + connection.sendAckThenClose(pubAckBuilder.build()); } else { log.error("[{}] Publish msg {} fail.", topic, msg, ex); - MqttAck pubAck = MqttPubAck.errorBuilder(protocolVersion) + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) .packetId(packetId) - .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR) - .reasonString(realCause.getMessage()) - .build(); - connection.sendAckThenClose(pubAck); + .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(realCause.getMessage()); + } + connection.sendAckThenClose(pubAckBuilder.build()); } return null; }); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5AuthorizationReasonCodeOnAllAckTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5AuthorizationReasonCodeOnAllAckTest.java index ca58744f1..e4f9af1bc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5AuthorizationReasonCodeOnAllAckTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5AuthorizationReasonCodeOnAllAckTest.java @@ -19,6 +19,7 @@ import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; @@ -120,9 +121,41 @@ public void testPublishNotAuthorized() { .qos(MqttQos.AT_LEAST_ONCE) .send(); client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send(); + Assert.fail(); } catch (Mqtt5PubAckException ex) { Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode(); Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED); + Assert.assertTrue(ex.getMqttMessage().getReasonString().isPresent()); + } + Awaitility.await() + .untilAsserted(() -> Assert.assertFalse(client.getState().isConnected())); + } + + @Test(timeOut = TIMEOUT) + public void testPublishWithRequestProblemInformation() { + Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5Client(getMqttBrokerPortList().get(0)); + client.connectWith() + .simpleAuth() + .username("user1") + .password("pass1".getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth() + .restrictions(Mqtt5ConnectRestrictions.builder() + .requestProblemInformation(false) + .build()) + .send(); + String message = "Hello MQTT Pulsar"; + try { + client.publishWith() + .topic("a") + .payload(message.getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send(); + Assert.fail(); + } catch (Mqtt5PubAckException ex) { + Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode(); + Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED); + Assert.assertFalse(ex.getMqttMessage().getReasonString().isPresent()); } Awaitility.await() .untilAsserted(() -> Assert.assertFalse(client.getState().isConnected())); From cf4bc01829859a400d243d5aff1d4f4894bac7e2 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 5 Jun 2022 21:28:41 +0800 Subject: [PATCH 2/4] Remove useless code --- .../pulsar/handlers/mqtt/messages/MqttPropertyUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index b5f74b1d3..ce6397717 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -32,7 +32,6 @@ */ @Slf4j public class MqttPropertyUtils { - private static int TRUE = 1; /** * Get session expire interval. From 55bcfcf57683e42a18417179f89f50ac5d8cdbe9 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 5 Jun 2022 21:32:27 +0800 Subject: [PATCH 3/4] Fix checkstyle --- .../mqtt/proxy/MQTTProxyProtocolMethodProcessor.java | 3 ++- .../support/MQTTBrokerProtocolMethodProcessor.java | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index 2ab24a5a1..d77cb1ecd 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -156,7 +156,8 @@ public void processPublish(MqttAdapterMessage adapter) { Throwable cause = ex.getCause(); log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}", msg.variableHeader().topicName(), connection.getClientId(), cause); - MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(connection.getProtocolVersion()) + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = + MqttPubAck.errorBuilder(connection.getProtocolVersion()) .packetId(packetId) .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR); if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 8fcb3dfd7..8e6680863 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -428,7 +428,8 @@ private CompletableFuture doSubscribe(MqttSubscribeMessage msg) { subTopics.stream().map(MqttTopicSubscription::topicName) .collect(Collectors.joining(",")), pulsarService.getConfig().isAllowAutoTopicCreation()); - MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = + MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(messageID) .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { @@ -437,7 +438,8 @@ private CompletableFuture doSubscribe(MqttSubscribeMessage msg) { connection.sendAckThenClose(subAckBuilder.build()); } else { log.error("[Subscribe] [{}] Failed to process MQTT subscribe.", connection.getClientId(), ex); - MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion()) + MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = + MqttSubAck.errorBuilder(connection.getProtocolVersion()) .packetId(messageID) .errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR); if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { @@ -538,8 +540,8 @@ public void processUnSubscribe(MqttAdapterMessage adapter) { .build(); connection.sendAck(unsubAck); } else { - MqttUnsubAck.MqttUnsubErrorAckBuilder unsubAckBuilder - = MqttUnsubAck.errorBuilder(connection.getProtocolVersion()) + MqttUnsubAck.MqttUnsubErrorAckBuilder unsubAckBuilder = + MqttUnsubAck.errorBuilder(connection.getProtocolVersion()) .packetId(packetId) .reasonCode(Mqtt5UnsubReasonCode.UNSPECIFIED_ERROR); if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { From 54e5714d90ae5df3eabc035f15bdcde20944dd01 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 5 Jun 2022 21:43:11 +0800 Subject: [PATCH 4/4] Add proxy side test --- ...horizationProxyReasonCodeOnAllAckTest.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java index 723ed5ff4..3782614c5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java @@ -19,6 +19,7 @@ import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; @@ -105,6 +106,7 @@ public void testSubScribeNotAuthorized() { .send(); try { client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send(); + Assert.fail(); } catch (Mqtt5SubAckException ex) { for (Mqtt5SubAckReasonCode reasonCode : ex.getMqttMessage().getReasonCodes()) { Assert.assertEquals(reasonCode, Mqtt5SubAckReasonCode.NOT_AUTHORIZED); @@ -131,9 +133,41 @@ public void testPublishNotAuthorized() { .qos(MqttQos.AT_LEAST_ONCE) .send(); client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send(); + Assert.fail(); } catch (Mqtt5PubAckException ex) { Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode(); Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED); + Assert.assertTrue(ex.getMqttMessage().getReasonString().isPresent()); + } + Awaitility.await() + .untilAsserted(() -> Assert.assertFalse(client.getState().isConnected())); + } + + @Test(timeOut = TIMEOUT) + public void testPublishWithRequestProblemInformation() { + Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5ProxyClient(getMqttBrokerPortList().get(0)); + client.connectWith() + .simpleAuth() + .username("user1") + .password("pass1".getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth() + .restrictions(Mqtt5ConnectRestrictions.builder() + .requestProblemInformation(false) + .build()) + .send(); + String message = "Hello MQTT Pulsar"; + try { + client.publishWith() + .topic("a") + .payload(message.getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send(); + Assert.fail(); + } catch (Mqtt5PubAckException ex) { + Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode(); + Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED); + Assert.assertFalse(ex.getMqttMessage().getReasonString().isPresent()); } Awaitility.await() .untilAsserted(() -> Assert.assertFalse(client.getState().isConnected()));