diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MessageBuilder.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MessageBuilder.java index 6a4b08afa..856430e13 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MessageBuilder.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MessageBuilder.java @@ -50,6 +50,7 @@ public static class PublishBuilder { private MqttQoS qos; private ByteBuf payload; private int messageId; + private MqttProperties properties; public PublishBuilder topicName(String topic) { this.topic = topic; @@ -76,9 +77,14 @@ public PublishBuilder messageId(int messageId) { return this; } + public PublishBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0); - MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId); + MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index 2b2d372df..fc7be5337 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; @@ -31,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.commons.codec.binary.Hex; /** @@ -96,7 +98,10 @@ public static WillMessage createWillMessage(MqttConnectMessage msg) { final String willTopic = msg.payload().willTopic(); final boolean retained = msg.variableHeader().isWillRetain(); final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos()); - return new WillMessage(willTopic, willMessage, qos, retained); + final List userProperty = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) + .stream().map(up -> (MqttProperties.StringPair) up.value()).collect(Collectors.toList()); + return new WillMessage(willTopic, willMessage, qos, retained, userProperty); } public static RetainedMessage createRetainedMessage(MqttPublishMessage msg) { @@ -122,13 +127,18 @@ public static MqttPublishMessage createRetainedMessage(RetainedMessage msg) { } public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) { - return MessageBuilder.publish() + MessageBuilder.PublishBuilder builder = MessageBuilder.publish() .topicName(willMessage.getTopic()) .payload(Unpooled.copiedBuffer(willMessage.getWillMessage())) .qos(willMessage.getQos()) .retained(willMessage.isRetained()) - .messageId(-1) - .build(); + .messageId(-1); + if (willMessage.userProperty != null) { + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.UserProperties(willMessage.userProperty)); + builder.properties(properties); + } + return builder.build(); } public static MqttMessage createMqttDisconnectMessage() { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java index 2c1ca612a..ef4eeb571 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java @@ -13,7 +13,9 @@ */ package io.streamnative.pulsar.handlers.mqtt.utils; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.List; import lombok.Getter; import lombok.Setter; @@ -28,14 +30,20 @@ public class WillMessage { byte[] willMessage; MqttQoS qos; boolean retained; + List userProperty; public WillMessage() { } - public WillMessage(final String topic, final byte[] willMessage, final MqttQoS qos, final boolean retained) { + public WillMessage(final String topic, + final byte[] willMessage, + final MqttQoS qos, + final boolean retained, + final List userProperty) { this.topic = topic; this.willMessage = willMessage; this.qos = qos; this.retained = retained; + this.userProperty = userProperty; } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java index 51867028e..97cbfe637 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5IntegrationTest.java @@ -19,6 +19,8 @@ import com.hivemq.client.mqtt.datatypes.MqttTopic; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; @@ -174,4 +176,46 @@ public void testMaximumPacketSize() throws Exception { client.unsubscribeWith().topicFilter(topic).send(); client.disconnect(); } + + @Test + public void testLastWillMessage() throws Exception { + final String topic = "testLastWillMessage"; + final String identifier = "test-Last-Will-Message"; + Mqtt5BlockingClient client = Mqtt5Client.builder() + .identifier(identifier) + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + Mqtt5UserProperties userProperty = Mqtt5UserProperties.builder() + .add("user-1", "value-1") + .add("user-2", "value-2") + .build(); + Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1"); + Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2"); + client.connectWith().willPublish(Mqtt5Publish.builder().topic(topic).userProperties(userProperty) + .asWill().payload("online".getBytes(StandardCharsets.UTF_8)).build()) + .send(); + Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier(identifier + "-client-2") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client2.connectWith().send(); + client2.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL); + client.disconnect(); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + Assert.assertEquals(message.getPayloadAsBytes(), "online".getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(message.getUserProperties().asList().size(), 2); + // Validate the user properties order, must be the same with set order. + Assert.assertEquals(message.getUserProperties().asList().get(0).compareTo(userProperty1), 0); + Assert.assertEquals(message.getUserProperties().asList().get(1).compareTo(userProperty2), 0); + publishes.close(); + client2.unsubscribeWith().topicFilter(topic).send(); + client2.disconnect(); + } }