diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java index 5e00243e8..0ffb5a8ba 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java @@ -14,14 +14,19 @@ package io.streamnative.pulsar.handlers.mqtt; import static io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toPulsarMsg; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException; +import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.support.RetainedMessageHandler; import io.streamnative.pulsar.handlers.mqtt.utils.MessagePublishContext; import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; @@ -43,14 +48,15 @@ protected AbstractQosPublishHandler(MQTTService mqttService) { this.configuration = mqttService.getServerConfiguration(); } - protected CompletableFuture> getTopicReference(MqttPublishMessage msg) { - return PulsarTopicUtils.getTopicReference(pulsarService, msg.variableHeader().topicName(), + protected CompletableFuture> getTopicReference(String mqttTopicName) { + return PulsarTopicUtils.getTopicReference(pulsarService, mqttTopicName, configuration.getDefaultTenant(), configuration.getDefaultNamespace(), true , configuration.getDefaultTopicDomain()); } - protected CompletableFuture writeToPulsarTopic(MqttPublishMessage msg) { - return writeToPulsarTopic(msg, false); + protected CompletableFuture writeToPulsarTopic(TopicAliasManager topicAliasManager, + MqttPublishMessage msg) { + return writeToPulsarTopic(topicAliasManager, msg, false); } /** @@ -59,19 +65,42 @@ protected CompletableFuture writeToPulsarTopic(MqttPublishMessage * @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException} * if the subscription does not exist; */ - protected CompletableFuture writeToPulsarTopic(MqttPublishMessage msg, - boolean checkSubscription) { - return getTopicReference(msg).thenCompose(topicOp -> topicOp.map(topic -> { - MessageImpl message = toPulsarMsg(topic, msg); + protected CompletableFuture writeToPulsarTopic(TopicAliasManager topicAliasManager, + MqttPublishMessage msg, boolean checkSubscription) { + Optional topicAlias = MqttPropertyUtils.getProperty(msg.variableHeader().properties(), + MqttProperties.MqttPropertyType.TOPIC_ALIAS); + String mqttTopicName; + if (topicAlias.isPresent()) { + int alias = topicAlias.get(); + String tpName = msg.variableHeader().topicName(); + if (StringUtils.isNoneBlank(tpName)) { + // update alias + boolean updateSuccess = topicAliasManager.updateTopicAlias(tpName, alias); + if (!updateSuccess) { + throw new MQTTTopicAliasExceedsLimitException(alias, + topicAliasManager.getTopicMaximumAlias()); + } + } + Optional realName = topicAliasManager.getTopicByAlias(alias); + if (!realName.isPresent()) { + throw new MQTTTopicAliasNotFoundException(alias); + } + mqttTopicName = realName.get(); + } else { + mqttTopicName = msg.variableHeader().topicName(); + } + return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { + MessageImpl message = toPulsarMsg(topic, msg.variableHeader().properties(), + msg.payload().nioBuffer()); CompletableFuture ret = MessagePublishContext.publishMessages(message, topic); message.recycle(); return ret.thenApply(position -> { if (checkSubscription && topic.getSubscriptions().isEmpty()) { - throw new MQTTNoMatchingSubscriberException(msg.variableHeader().topicName()); + throw new MQTTNoMatchingSubscriberException(mqttTopicName); } return position; }); }).orElseGet(() -> FutureUtil.failedFuture( - new BrokerServiceException.TopicNotFoundException(msg.variableHeader().topicName())))); + new BrokerServiceException.TopicNotFoundException(mqttTopicName)))); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 7edde08c1..eabd01e23 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -75,6 +75,10 @@ public class Connection { private volatile int serverCurrentReceiveCounter = 0; @Getter private final ProtocolMethodProcessor processor; + + @Getter + private final TopicAliasManager topicAliasManager; + @Getter private final boolean fromProxy; private volatile ConnectionState connectionState = DISCONNECTED; @@ -103,6 +107,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.manager.addConnection(this); + this.topicAliasManager = new TopicAliasManager(clientRestrictions.getTopicAliasMaximum()); } private void addIdleStateHandler() { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java new file mode 100644 index 000000000..01f970749 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/TopicAliasManager.java @@ -0,0 +1,97 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +public class TopicAliasManager { + + public static final int NO_ALIAS = -1; + private final Map clientSideAlias; + private final Map brokerSideAlias; + + private final PacketIdGenerator idGenerator; + @Getter + private final int topicMaximumAlias; + + public TopicAliasManager(int topicMaximumAlias) { + this.topicMaximumAlias = topicMaximumAlias; + this.idGenerator = PacketIdGenerator.newNonZeroGenerator(); + this.clientSideAlias = new ConcurrentHashMap<>(topicMaximumAlias); + this.brokerSideAlias = new ConcurrentHashMap<>(topicMaximumAlias); + } + + public Optional getTopicByAlias(int alias) { + return Optional.ofNullable(clientSideAlias.get(alias)); + } + + public boolean updateTopicAlias(String topic, int alias) { + return clientSideAlias.compute(alias, (k, v) -> { + if (v == null && clientSideAlias.size() >= topicMaximumAlias) { + return null; + } + return topic; + }) != null; + } + + public Optional getOrCreateAlias(String topic) { + TopicAliasInfo topicAliasInfo = brokerSideAlias.computeIfAbsent(topic, (k) -> { + if (brokerSideAlias.size() >= topicMaximumAlias) { + return null; + } + return TopicAliasInfo.builder() + .alias(idGenerator.nextPacketId()) + .syncToClient(false) + .build(); + }); + if (topicAliasInfo == null) { + // Exceeds topic alias maximum + return Optional.empty(); + } + return Optional.of(topicAliasInfo.getAlias()); + } + + public boolean isSyncAliasToClient(String topicName) { + TopicAliasInfo topicAliasInfo = brokerSideAlias.get(topicName); + if (topicAliasInfo == null) { + return false; + } + return topicAliasInfo.isSyncToClient(); + } + + public void syncAlias(String topic) { + brokerSideAlias.computeIfPresent(topic, (key, info) -> { + info.setSyncToClient(true); + return info; + }); + } + + @Getter + @ToString + @EqualsAndHashCode + @Builder + static class TopicAliasInfo { + private int alias; + @Setter + private boolean syncToClient; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java new file mode 100644 index 000000000..f678126e1 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasExceedsLimitException.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +import lombok.Getter; + +public class MQTTTopicAliasExceedsLimitException extends RuntimeException { + @Getter + private final int alias; + @Getter + private final int topicAliasMaximum; + + public MQTTTopicAliasExceedsLimitException(int alias, int topicAliasMaximum) { + super(String.format("The topic alias %s is exceed topic alias maximum %s.", alias, topicAliasMaximum)); + this.alias = alias; + this.topicAliasMaximum = topicAliasMaximum; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java new file mode 100644 index 000000000..b103458a5 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTTopicAliasNotFoundException.java @@ -0,0 +1,26 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +import lombok.Getter; + +public class MQTTTopicAliasNotFoundException extends RuntimeException { + @Getter + private final int alias; + + public MQTTTopicAliasNotFoundException(int alias) { + super(String.format("The topic alias %s is not found.", alias)); + this.alias = alias; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index 4915f706e..ca3085d21 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -49,8 +49,8 @@ public static Optional getExpireInterval(MqttProperties properties) { } @SuppressWarnings("unchecked") - public static Optional getIntProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) { - MqttProperties.MqttProperty property = properties.getProperty(type.value()); + public static Optional getProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) { + MqttProperties.MqttProperty property = properties.getProperty(type.value()); if (property == null) { return Optional.empty(); } @@ -66,23 +66,23 @@ public static void parsePropertiesToStuffRestriction( getExpireInterval(properties) .ifPresent(clientRestrictionsBuilder::sessionExpireInterval); // parse receive maximum - Optional receiveMaximum = getIntProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM); + Optional receiveMaximum = getProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM); if (receiveMaximum.isPresent() && receiveMaximum.get() == 0) { throw new InvalidReceiveMaximumException("Not Allow Receive maximum property value zero"); } else { receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum); } // parse maximum packet size - Optional maximumPacketSize = getIntProperty(properties, MqttProperties + Optional maximumPacketSize = getProperty(properties, MqttProperties .MqttPropertyType.MAXIMUM_PACKET_SIZE); maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize); // parse request problem information Optional requestProblemInformation = - getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); + getProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); // the empty option means allowing reason string or user property clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent()); Optional topicAliasMaximum = - getIntProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM); + getProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM); topicAliasMaximum.ifPresent(clientRestrictionsBuilder::topicAliasMaximum); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java index bb5656f77..ecd81d2e8 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos0PublishHandler.java @@ -37,6 +37,6 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { return retainedMessageHandler.addRetainedMessage(msg); } - return writeToPulsarTopic(msg).thenAccept(__ -> {}); + return writeToPulsarTopic(connection.getTopicAliasManager(), msg).thenAccept(__ -> {}); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index fd0bee0a8..8a21a761c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -19,6 +19,8 @@ import io.streamnative.pulsar.handlers.mqtt.MQTTService; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttAck; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttPubAck; import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode; @@ -48,7 +50,8 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage if (MqttUtils.isRetainedMessage(msg)) { ret = retainedMessageHandler.addRetainedMessage(msg); } else { - ret = writeToPulsarTopic(msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); + ret = writeToPulsarTopic(connection.getTopicAliasManager() + , msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null); } // we need to check if subscription exist when protocol version is mqtt 5.x return ret @@ -87,6 +90,28 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage pubAckBuilder.reasonString("Topic not found"); } connection.sendAckThenClose(pubAckBuilder.build()); + } else if (realCause instanceof MQTTTopicAliasNotFoundException) { + log.error("[{}] Publish message fail {}, because the topic alias {} not found.", topic, msg, + ((MQTTTopicAliasNotFoundException) realCause).getAlias(), ex); + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) + .packetId(packetId) + .reasonCode(Mqtt5PubReasonCode.TOPIC_NAME_INVALID); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(ex.getMessage()); + } + connection.sendAckThenClose(pubAckBuilder.build()); + } else if (realCause instanceof MQTTTopicAliasExceedsLimitException) { + log.error("[{}] Publish message fail {}," + + " because the topic alias {} is exceed topic alias maximum {}.", topic, msg, + ((MQTTTopicAliasExceedsLimitException) realCause).getAlias(), + ((MQTTTopicAliasExceedsLimitException) realCause).getTopicAliasMaximum(), ex); + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) + .packetId(packetId) + .reasonCode(Mqtt5PubReasonCode.QUOTA_EXCEEDED); + if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) { + pubAckBuilder.reasonString(ex.getMessage()); + } + connection.sendAckThenClose(pubAckBuilder.build()); } else { log.error("[{}] Publish msg {} fail.", topic, msg, ex); MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 22d26fbd4..7b4f9b89b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -26,6 +26,7 @@ import io.streamnative.pulsar.handlers.mqtt.PacketIdGenerator; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -69,10 +70,9 @@ protected MessageMetadata initialValue() throws Exception { }; // Convert MQTT message to Pulsar message. - public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mqttMsg) { + public static MessageImpl toPulsarMsg(Topic topic, MqttProperties properties, ByteBuffer payload) { MessageMetadata metadata = LOCAL_MESSAGE_METADATA.get(); metadata.clear(); - MqttProperties properties = mqttMsg.variableHeader().properties(); if (properties != null) { properties.listAll().forEach(prop -> { if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { @@ -104,7 +104,7 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq } }); } - return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); + return MessageImpl.create(metadata, payload, SCHEMA, topic.getName()); } private static String getPropertiesPrefix(int propertyId) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java index 97cbfe637..7ed23471c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java @@ -22,12 +22,15 @@ import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -218,4 +221,51 @@ public void testLastWillMessage() throws Exception { client2.unsubscribeWith().topicFilter(topic).send(); client2.disconnect(); } + + @Test + public void testTopicAlias() throws InterruptedException { + Mqtt5BlockingClient client = Mqtt5Client.builder() + .identifier(UUID.randomUUID().toString()) + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + Mqtt5ConnAck connAck = client.connectWith() + .restrictions() + .topicAliasMaximum(10) + .sendTopicAliasMaximum(10) + .applyRestrictions() + .send(); + Assert.assertEquals(connAck.getRestrictions().getTopicAliasMaximum(), 10); + final String topicName = "a/b/c"; + client.subscribeWith() + .topicFilter(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + List contents = new ArrayList<>(); + client.toAsync().publishes(MqttGlobalPublishFilter.ALL, (msg) -> { + contents.add(new String(msg.getPayloadAsBytes())); + }); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi1".getBytes(StandardCharsets.UTF_8)) + .send(); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi2".getBytes(StandardCharsets.UTF_8)) + .send(); + client.publishWith() + .topic(topicName) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("hihihi3".getBytes(StandardCharsets.UTF_8)) + .send(); + Awaitility.await() + .untilAsserted(() -> { + Assert.assertEquals(contents.size(), 3); + Assert.assertTrue(contents.contains("hihihi1")); + Assert.assertTrue(contents.contains("hihihi2")); + Assert.assertTrue(contents.contains("hihihi3")); + }); + } }