From 37dfc0870d86c8d32f956f8517160c5e5e8d3a70 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 26 Jun 2022 23:10:24 +0800 Subject: [PATCH 1/3] [WIP] Support topic alias. --- .../mqtt/AbstractQosPublishHandler.java | 21 +++++++++++-------- .../pulsar/handlers/mqtt/Connection.java | 14 +++++++++++++ .../mqtt/support/Qos0PublishHandler.java | 2 +- .../mqtt/support/Qos1PublishHandler.java | 2 +- .../mqtt/utils/PulsarMessageConverter.java | 6 +++--- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java index 5e00243e8..af43263fa 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java @@ -43,14 +43,14 @@ protected AbstractQosPublishHandler(MQTTService mqttService) { this.configuration = mqttService.getServerConfiguration(); } - protected CompletableFuture> getTopicReference(MqttPublishMessage msg) { - return PulsarTopicUtils.getTopicReference(pulsarService, msg.variableHeader().topicName(), + protected CompletableFuture> getTopicReference(String mqttTopicName) { + return PulsarTopicUtils.getTopicReference(pulsarService, mqttTopicName, configuration.getDefaultTenant(), configuration.getDefaultNamespace(), true , configuration.getDefaultTopicDomain()); } - protected CompletableFuture writeToPulsarTopic(MqttPublishMessage msg) { - return writeToPulsarTopic(msg, false); + protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) { + return writeToPulsarTopic(connection, msg, false); } /** @@ -59,19 +59,22 @@ protected CompletableFuture writeToPulsarTopic(MqttPublishMessage * @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException} * if the subscription does not exist; */ - protected CompletableFuture writeToPulsarTopic(MqttPublishMessage msg, + protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg, boolean checkSubscription) { - return getTopicReference(msg).thenCompose(topicOp -> topicOp.map(topic -> { - MessageImpl message = toPulsarMsg(topic, msg); + String maybeTopicAlias = msg.variableHeader().topicName(); + String mqttTopicName = connection.findTopicIfAlias(maybeTopicAlias); + return getTopicReference(maybeTopicAlias).thenCompose(topicOp -> topicOp.map(topic -> { + MessageImpl message = toPulsarMsg(topic, msg.variableHeader().properties(), + msg.payload().nioBuffer()); CompletableFuture ret = MessagePublishContext.publishMessages(message, topic); message.recycle(); return ret.thenApply(position -> { if (checkSubscription && topic.getSubscriptions().isEmpty()) { - throw new MQTTNoMatchingSubscriberException(msg.variableHeader().topicName()); + throw new MQTTNoMatchingSubscriberException(mqttTopicName); } return position; }); }).orElseGet(() -> FutureUtil.failedFuture( - new BrokerServiceException.TopicNotFoundException(msg.variableHeader().topicName())))); + new BrokerServiceException.TopicNotFoundException(mqttTopicName)))); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 7edde08c1..5e6ae6d0d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -37,8 +37,11 @@ import io.streamnative.pulsar.handlers.mqtt.utils.FutureUtils; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; import io.streamnative.pulsar.handlers.mqtt.utils.WillMessage; + +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; @@ -75,6 +78,8 @@ public class Connection { private volatile int serverCurrentReceiveCounter = 0; @Getter private final ProtocolMethodProcessor processor; + + private final Map topicAlias; @Getter private final boolean fromProxy; private volatile ConnectionState connectionState = DISCONNECTED; @@ -103,6 +108,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.manager.addConnection(this); + this.topicAlias = new ConcurrentHashMap<>(); } private void addIdleStateHandler() { @@ -338,6 +344,14 @@ public Connection build() { } } + public void addTopicAlias(String topic, String alias) { + topicAlias.put(topic, alias); + } + + public String findTopicIfAlias(String maybeAlias) { + return topicAlias.getOrDefault(maybeAlias, maybeAlias); + } + @Override public String toString() { return "Connection{" + "clientId=" + clientId + ", channel=" + channel diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java index bb5656f77..c99468561 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java @@ -37,6 +37,6 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { return retainedMessageHandler.addRetainedMessage(msg); } - return writeToPulsarTopic(msg).thenAccept(__ -> {}); + return writeToPulsarTopic(connection, msg).thenAccept(__ -> {}); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index fd0bee0a8..8e261cada 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -48,7 +48,7 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { ret = retainedMessageHandler.addRetainedMessage(msg); } else { - ret = writeToPulsarTopic(msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); + ret = writeToPulsarTopic(connection, msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); } // we need to check if subscription exist when protocol version is mqtt 5.x return ret diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 22d26fbd4..7b4f9b89b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -26,6 +26,7 @@ import io.streamnative.pulsar.handlers.mqtt.PacketIdGenerator; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -69,10 +70,9 @@ protected MessageMetadata initialValue() throws Exception { }; // Convert MQTT message to Pulsar message. - public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mqttMsg) { + public static MessageImpl toPulsarMsg(Topic topic, MqttProperties properties, ByteBuffer payload) { MessageMetadata metadata = LOCAL_MESSAGE_METADATA.get(); metadata.clear(); - MqttProperties properties = mqttMsg.variableHeader().properties(); if (properties != null) { properties.listAll().forEach(prop -> { if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { @@ -104,7 +104,7 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq } }); } - return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); + return MessageImpl.create(metadata, payload, SCHEMA, topic.getName()); } private static String getPropertiesPrefix(int propertyId) { From c6cbccc345b1271a6a8417e5f6064473a85a342f Mon Sep 17 00:00:00 2001 From: mattison chao Date: Mon, 27 Jun 2022 23:17:41 +0800 Subject: [PATCH 2/3] Support publish topic alias --- .../mqtt/AbstractQosPublishHandler.java | 35 +++++-- .../pulsar/handlers/mqtt/Connection.java | 17 +--- .../handlers/mqtt/TopicAliasManager.java | 96 +++++++++++++++++++ .../MQTTTopicAliasNotFoundException.java | 26 +++++ .../mqtt/messages/MqttPropertyUtils.java | 12 +-- .../mqtt/support/Qos0PublishHandler.java | 2 +- .../mqtt/support/Qos1PublishHandler.java | 3 +- .../hivemq/base/MQTT5IntegrationTest.java | 50 ++++++++++ 8 files changed, 213 insertions(+), 28 deletions(-) create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java index af43263fa..7433b4bcd 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java @@ -14,14 +14,18 @@ package io.streamnative.pulsar.handlers.mqtt; import static io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toPulsarMsg; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException; +import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.support.RetainedMessageHandler; import io.streamnative.pulsar.handlers.mqtt.utils.MessagePublishContext; import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; @@ -49,8 +53,9 @@ protected CompletableFuture> getTopicReference(String mqttTopicN , configuration.getDefaultTopicDomain()); } - protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) { - return writeToPulsarTopic(connection, msg, false); + protected CompletableFuture writeToPulsarTopic(TopicAliasManager topicAliasManager, + MqttPublishMessage msg) { + return writeToPulsarTopic(topicAliasManager, msg, false); } /** @@ -59,11 +64,27 @@ protected CompletableFuture writeToPulsarTopic(Connection connecti * @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException} * if the subscription does not exist; */ - protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg, - boolean checkSubscription) { - String maybeTopicAlias = msg.variableHeader().topicName(); - String mqttTopicName = connection.findTopicIfAlias(maybeTopicAlias); - return getTopicReference(maybeTopicAlias).thenCompose(topicOp -> topicOp.map(topic -> { + protected CompletableFuture writeToPulsarTopic(TopicAliasManager topicAliasManager, + MqttPublishMessage msg, boolean checkSubscription) { + Optional topicAlias = MqttPropertyUtils.getProperty(msg.variableHeader().properties(), + MqttProperties.MqttPropertyType.TOPIC_ALIAS); + String mqttTopicName; + if (topicAlias.isPresent()) { + int alias = topicAlias.get(); + String tpName = msg.variableHeader().topicName(); + if (StringUtils.isNoneBlank(tpName)) { + // update alias + topicAliasManager.updateTopicAlias(tpName, alias); + } + Optional realName = topicAliasManager.getTopicByAlias(alias); + if (!realName.isPresent()) { + throw new MQTTTopicAliasNotFoundException(alias); + } + mqttTopicName = realName.get(); + } else { + mqttTopicName = msg.variableHeader().topicName(); + } + return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { MessageImpl message = toPulsarMsg(topic, msg.variableHeader().properties(), msg.payload().nioBuffer()); CompletableFuture ret = MessagePublishContext.publishMessages(message, topic); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 5e6ae6d0d..eabd01e23 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -37,11 +37,8 @@ import io.streamnative.pulsar.handlers.mqtt.utils.FutureUtils; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; import io.streamnative.pulsar.handlers.mqtt.utils.WillMessage; - -import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; @@ -79,7 +76,9 @@ public class Connection { @Getter private final ProtocolMethodProcessor processor; - private final Map topicAlias; + @Getter + private final TopicAliasManager topicAliasManager; + @Getter private final boolean fromProxy; private volatile ConnectionState connectionState = DISCONNECTED; @@ -108,7 +107,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.manager.addConnection(this); - this.topicAlias = new ConcurrentHashMap<>(); + this.topicAliasManager = new TopicAliasManager(clientRestrictions.getTopicAliasMaximum()); } private void addIdleStateHandler() { @@ -344,14 +343,6 @@ public Connection build() { } } - public void addTopicAlias(String topic, String alias) { - topicAlias.put(topic, alias); - } - - public String findTopicIfAlias(String maybeAlias) { - return topicAlias.getOrDefault(maybeAlias, maybeAlias); - } - @Override public String toString() { return "Connection{" + "clientId=" + clientId + ", channel=" + channel diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java new file mode 100644 index 000000000..639aa83a7 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java @@ -0,0 +1,96 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +public class TopicAliasManager { + + public static final int NO_ALIAS = -1; + private final Map clientSideAlias; + private final Map brokerSideAlias; + + private final PacketIdGenerator idGenerator; + private final int topicMaximumAlias; + + public TopicAliasManager(int topicMaximumAlias) { + this.topicMaximumAlias = topicMaximumAlias; + this.idGenerator = PacketIdGenerator.newNonZeroGenerator(); + this.clientSideAlias = new ConcurrentHashMap<>(topicMaximumAlias); + this.brokerSideAlias = new ConcurrentHashMap<>(topicMaximumAlias); + } + + public Optional getTopicByAlias(int alias) { + return Optional.ofNullable(clientSideAlias.get(alias)); + } + + public boolean updateTopicAlias(String topic, int alias) { + return clientSideAlias.compute(alias, (k, v) -> { + if (v == null && clientSideAlias.size() >= topicMaximumAlias) { + return null; + } + return topic; + }) != null; + } + + public Optional getOrCreateAlias(String topic) { + TopicAliasInfo topicAliasInfo = brokerSideAlias.computeIfAbsent(topic, (k) -> { + if (brokerSideAlias.size() >= topicMaximumAlias) { + return null; + } + return TopicAliasInfo.builder() + .alias(idGenerator.nextPacketId()) + .syncToClient(false) + .build(); + }); + if (topicAliasInfo == null) { + // Exceeds topic alias maximum + return Optional.empty(); + } + return Optional.of(topicAliasInfo.getAlias()); + } + + public boolean isSyncAliasToClient(String topicName) { + TopicAliasInfo topicAliasInfo = brokerSideAlias.get(topicName); + if (topicAliasInfo == null) { + return false; + } + return topicAliasInfo.isSyncToClient(); + } + + public void syncAlias(String topic) { + brokerSideAlias.computeIfPresent(topic, (key, info) -> { + info.setSyncToClient(true); + return info; + }); + } + + @Getter + @ToString + @EqualsAndHashCode + @Builder + static class TopicAliasInfo { + private int alias; + @Setter + private boolean syncToClient; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java new file mode 100644 index 000000000..f9db6265f --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java @@ -0,0 +1,26 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +import lombok.Getter; + +public class MQTTTopicAliasNotFoundException extends RuntimeException{ + @Getter + private final int alias; + + public MQTTTopicAliasNotFoundException(int alias) { + super(String.format("The topic alias %s is not found.", alias)); + this.alias = alias; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index 4915f706e..ca3085d21 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -49,8 +49,8 @@ public static Optional getExpireInterval(MqttProperties properties) { } @SuppressWarnings("unchecked") - public static Optional getIntProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) { - MqttProperties.MqttProperty property = properties.getProperty(type.value()); + public static Optional getProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) { + MqttProperties.MqttProperty property = properties.getProperty(type.value()); if (property == null) { return Optional.empty(); } @@ -66,23 +66,23 @@ public static void parsePropertiesToStuffRestriction( getExpireInterval(properties) .ifPresent(clientRestrictionsBuilder::sessionExpireInterval); // parse receive maximum - Optional receiveMaximum = getIntProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM); + Optional receiveMaximum = getProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM); if (receiveMaximum.isPresent() && receiveMaximum.get() == 0) { throw new InvalidReceiveMaximumException("Not Allow Receive maximum property value zero"); } else { receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum); } // parse maximum packet size - Optional maximumPacketSize = getIntProperty(properties, MqttProperties + Optional maximumPacketSize = getProperty(properties, MqttProperties .MqttPropertyType.MAXIMUM_PACKET_SIZE); maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize); // parse request problem information Optional requestProblemInformation = - getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); + getProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); // the empty option means allowing reason string or user property clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent()); Optional topicAliasMaximum = - getIntProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM); + getProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM); topicAliasMaximum.ifPresent(clientRestrictionsBuilder::topicAliasMaximum); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java index c99468561..ecd81d2e8 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java @@ -37,6 +37,6 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { return retainedMessageHandler.addRetainedMessage(msg); } - return writeToPulsarTopic(connection, msg).thenAccept(__ -> {}); + return writeToPulsarTopic(connection.getTopicAliasManager(), msg).thenAccept(__ -> {}); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index 8e261cada..fdf20da0b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -48,7 +48,8 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { ret = retainedMessageHandler.addRetainedMessage(msg); } else { - ret = writeToPulsarTopic(connection, msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); + ret = writeToPulsarTopic(connection.getTopicAliasManager() + , msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); } // we need to check if subscription exist when protocol version is mqtt 5.x return ret diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java index 97cbfe637..7ed23471c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java @@ -22,12 +22,15 @@ import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -218,4 +221,51 @@ public void testLastWillMessage() throws Exception { client2.unsubscribeWith().topicFilter(topic).send(); client2.disconnect(); } + + @Test + public void testTopicAlias() throws InterruptedException { + Mqtt5BlockingClient client = Mqtt5Client.builder() + .identifier(UUID.randomUUID().toString()) + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + Mqtt5ConnAck connAck = client.connectWith() + .restrictions() + .topicAliasMaximum(10) + .sendTopicAliasMaximum(10) + .applyRestrictions() + .send(); + Assert.assertEquals(connAck.getRestrictions().getTopicAliasMaximum(), 10); + final String topicName = "a/b/c"; + client.subscribeWith() + .topicFilter(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + List contents = new ArrayList<>(); + client.toAsync().publishes(MqttGlobalPublishFilter.ALL, (msg) -> { + contents.add(new String(msg.getPayloadAsBytes())); + }); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi1".getBytes(StandardCharsets.UTF_8)) + .send(); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi2".getBytes(StandardCharsets.UTF_8)) + .send(); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi3".getBytes(StandardCharsets.UTF_8)) + .send(); + Awaitility.await() + .untilAsserted(() -> { + Assert.assertEquals(contents.size(), 3); + Assert.assertTrue(contents.contains("hihihi1")); + Assert.assertTrue(contents.contains("hihihi2")); + Assert.assertTrue(contents.contains("hihihi3")); + }); + } } From 59ec793a868c95b2288fd9a171d371f24af6b86b Mon Sep 17 00:00:00 2001 From: mattison chao Date: Tue, 28 Jun 2022 08:28:33 +0800 Subject: [PATCH 3/3] Add exception handler --- .../mqtt/AbstractQosPublishHandler.java | 7 ++++- .../handlers/mqtt/TopicAliasManager.java | 1 + .../MQTTTopicAliasExceedsLimitException.java | 29 +++++++++++++++++++ .../MQTTTopicAliasNotFoundException.java | 2 +- .../mqtt/support/Qos1PublishHandler.java | 24 +++++++++++++++ 5 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java index 7433b4bcd..0ffb5a8ba 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java @@ -17,6 +17,7 @@ import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException; import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.support.RetainedMessageHandler; @@ -74,7 +75,11 @@ protected CompletableFuture writeToPulsarTopic(TopicAliasManager t String tpName = msg.variableHeader().topicName(); if (StringUtils.isNoneBlank(tpName)) { // update alias - topicAliasManager.updateTopicAlias(tpName, alias); + boolean updateSuccess = topicAliasManager.updateTopicAlias(tpName, alias); + if (!updateSuccess) { + throw new MQTTTopicAliasExceedsLimitException(alias, + topicAliasManager.getTopicMaximumAlias()); + } } Optional realName = topicAliasManager.getTopicByAlias(alias); if (!realName.isPresent()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java index 639aa83a7..01f970749 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java @@ -30,6 +30,7 @@ public class TopicAliasManager { private final Map brokerSideAlias; private final PacketIdGenerator idGenerator; + @Getter private final int topicMaximumAlias; public TopicAliasManager(int topicMaximumAlias) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java new file mode 100644 index 000000000..f678126e1 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +import lombok.Getter; + +public class MQTTTopicAliasExceedsLimitException extends RuntimeException { + @Getter + private final int alias; + @Getter + private final int topicAliasMaximum; + + public MQTTTopicAliasExceedsLimitException(int alias, int topicAliasMaximum) { + super(String.format("The topic alias %s is exceed topic alias maximum %s.", alias, topicAliasMaximum)); + this.alias = alias; + this.topicAliasMaximum = topicAliasMaximum; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java index f9db6265f..b103458a5 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java @@ -15,7 +15,7 @@ import lombok.Getter; -public class MQTTTopicAliasNotFoundException extends RuntimeException{ +public class MQTTTopicAliasNotFoundException extends RuntimeException { @Getter private final int alias; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index fdf20da0b..8a21a761c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -19,6 +19,8 @@ import io.streamnative.pulsar.handlers.mqtt.MQTTService; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttAck; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttPubAck; import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode; @@ -88,6 +90,28 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage pubAckBuilder.reasonString("Topic not found"); } connection.sendAckThenClose(pubAckBuilder.build()); + } else if (realCause instanceof MQTTTopicAliasNotFoundException) { + log.error("[{}] Publish message fail {}, because the topic alias {} not found.", topic, msg, + ((MQTTTopicAliasNotFoundException) realCause).getAlias(), ex); + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) + .packetId(packetId) + .reasonCode(Mqtt5PubReasonCode.TOPIC_NAME_INVALID); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(ex.getMessage()); + } + connection.sendAckThenClose(pubAckBuilder.build()); + } else if (realCause instanceof MQTTTopicAliasExceedsLimitException) { + log.error("[{}] Publish message fail {}," + + " because the topic alias {} is exceed topic alias maximum {}.", topic, msg, + ((MQTTTopicAliasExceedsLimitException) realCause).getAlias(), + ((MQTTTopicAliasExceedsLimitException) realCause).getTopicAliasMaximum(), ex); + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) + .packetId(packetId) + .reasonCode(Mqtt5PubReasonCode.QUOTA_EXCEEDED); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(ex.getMessage()); + } + connection.sendAckThenClose(pubAckBuilder.build()); } else { log.error("[{}] Publish msg {} fail.", topic, msg, ex); MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)