diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index fc7be5337..16744c359 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -101,7 +102,20 @@ public static WillMessage createWillMessage(MqttConnectMessage msg) { final List userProperty = msg.payload().willProperties() .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) .stream().map(up -> (MqttProperties.StringPair) up.value()).collect(Collectors.toList()); - return new WillMessage(willTopic, willMessage, qos, retained, userProperty); + String contentType = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.CONTENT_TYPE.value()) + .stream().map(up -> ((MqttProperties.StringProperty) up).value()).findFirst().orElse(null); + String responseTopic = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value()) + .stream().map(up -> ((MqttProperties.StringProperty) up).value()).findFirst().orElse(null); + String correlationData = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.CORRELATION_DATA.value()) + .stream().map(up -> new String(((MqttProperties.BinaryProperty) up).value())).findFirst().orElse(null); + int payloadFormatIndicator = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value()) + .stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0); + return new WillMessage(willTopic, willMessage, qos, retained, userProperty, + contentType, responseTopic, correlationData, payloadFormatIndicator); } public static RetainedMessage createRetainedMessage(MqttPublishMessage msg) { @@ -133,11 +147,25 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) .qos(willMessage.getQos()) .retained(willMessage.isRetained()) .messageId(-1); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value(), willMessage.payloadFormatIndicator)); if (willMessage.userProperty != null) { - MqttProperties properties = new MqttProperties(); properties.add(new MqttProperties.UserProperties(willMessage.userProperty)); - builder.properties(properties); } + if (willMessage.contentType != null) { + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), + willMessage.contentType)); + } + if (willMessage.responseTopic != null) { + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value(), + willMessage.responseTopic)); + } + if (willMessage.correlationData != null) { + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.CORRELATION_DATA.value(), + willMessage.correlationData.getBytes(StandardCharsets.UTF_8))); + } + builder.properties(properties); return builder.build(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 50986680a..dcc60f001 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -92,6 +92,10 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq MqttProperties.BinaryProperty property = (MqttProperties.BinaryProperty) prop; metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) .setValue(new String(property.value())); + } else if (MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value() == prop.propertyId()) { + MqttProperties.IntegerProperty property = (MqttProperties.IntegerProperty) prop; + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(String.valueOf(property.value())); } }); } @@ -125,6 +129,10 @@ public static List toMqttMessages(String topicName, Entry en case CONTENT_TYPE: properties.add(new MqttProperties.StringProperty(propertyId, kv.getValue())); break; + case PAYLOAD_FORMAT_INDICATOR: + properties.add(new MqttProperties.IntegerProperty(propertyId, + Integer.parseInt(kv.getValue()))); + break; case CORRELATION_DATA: properties.add(new MqttProperties.BinaryProperty(propertyId, kv.getValue() .getBytes(StandardCharsets.UTF_8))); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java index ef4eeb571..47492de45 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java @@ -32,6 +32,15 @@ public class WillMessage { boolean retained; List userProperty; + String contentType; + + String responseTopic; + + String correlationData; + + int payloadFormatIndicator; + + public WillMessage() { } @@ -39,11 +48,19 @@ public WillMessage(final String topic, final byte[] willMessage, final MqttQoS qos, final boolean retained, - final List userProperty) { + final List userProperty, + final String contentType, + final String responseTopic, + final String correlationData, + final int payloadFormatIndicator) { this.topic = topic; this.willMessage = willMessage; this.qos = qos; this.retained = retained; this.userProperty = userProperty; + this.contentType = contentType; + this.responseTopic = responseTopic; + this.correlationData = correlationData; + this.payloadFormatIndicator = payloadFormatIndicator; } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java new file mode 100644 index 000000000..f52abf449 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java @@ -0,0 +1,93 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base; + +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import lombok.extern.slf4j.Slf4j; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class MQTT5ConnectRelatedProtocolTest extends MQTTTestBase { + + @Test + public void testConnectWillMessage() throws Exception { + final String topic = "will-topic-test"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("client-1") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith() + .send(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + // + Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier("client-2") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + + client2.connectWith() + .willPublish() + .topic(topic) + .payload("will-message".getBytes(StandardCharsets.UTF_8)) + .contentType("will-content-type") + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .correlationData("cd".getBytes(StandardCharsets.UTF_8)) + .responseTopic("will-response-topic") + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.fromCode(1)) + .applyWillPublish() + .send(); + client2.disconnect(); + // + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + Assert.assertEquals(new String(message.getPayloadAsBytes()), "will-message"); + // Validate the user properties order, must be the same with set order. + ByteBuffer byteBuffer = message.getCorrelationData().get(); + Assert.assertNotNull(byteBuffer); + byte[] bytes; + if (byteBuffer.hasArray()) { + bytes = byteBuffer.array(); + } else { + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } + String result = new String(bytes, StandardCharsets.UTF_8); + Assert.assertEquals(result, "cd"); + + Assert.assertNotNull(message.getResponseTopic().get()); + Assert.assertEquals(message.getResponseTopic().get().toString(), "will-response-topic"); + Assert.assertNotNull(message.getContentType().get()); + Assert.assertEquals(message.getContentType().get().toString(), "will-content-type"); + Assert.assertNotNull(message.getPayloadFormatIndicator().get()); + Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8); + + publishes.close(); + client1.disconnect(); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index 2ca97bdc2..ef2fa2d44 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -19,6 +19,7 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.ByteBuffer; @@ -46,7 +47,8 @@ public void testPublishWithUserProperties() throws Exception { Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2"); client1.connectWith().send(); Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE) - .userProperties(userProperty).build(); + .userProperties(userProperty) + .asWill().build(); Mqtt5BlockingClient client2 = Mqtt5Client.builder() .identifier("ccc") @@ -160,4 +162,31 @@ public void testPublishWithCorrelationData() throws Exception { publishes.close(); client1.disconnect(); } + + @Test + public void testPublishWithPayloadFormatIndicator() throws Exception { + final String topic = "testPublishWithPayloadFormatIndicator"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getPayloadFormatIndicator().get()); + Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8); + publishes.close(); + client1.disconnect(); + } }