diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index b3d6e634f..77b707c9f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -26,6 +26,10 @@ public final class Constants { public static final String ATTR_TOPIC_SUBS = "topicSubs"; + public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_"; + + public static final String MQTT_PROPERTIES_PREFIX = "MQTT_PROPERTIES_"; + private Constants() { } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 5caefe44c..50986680a 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -14,6 +14,9 @@ package io.streamnative.pulsar.handlers.mqtt.utils; import static com.google.common.base.Preconditions.checkArgument; +import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES; +import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES_PREFIX; +import com.google.common.base.Splitter; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -22,6 +25,7 @@ import io.streamnative.pulsar.handlers.mqtt.PacketIdGenerator; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -73,14 +77,31 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) prop; userProperties.value().forEach(pair -> { - metadata.addProperty().setKey(pair.key).setValue(pair.value); + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key) + .setValue(pair.value); }); + } else if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) { + MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(property.value()); + } else if (MqttProperties.MqttPropertyType.CONTENT_TYPE.value() == prop.propertyId()) { + MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(property.value()); + } else if (MqttProperties.MqttPropertyType.CORRELATION_DATA.value() == prop.propertyId()) { + MqttProperties.BinaryProperty property = (MqttProperties.BinaryProperty) prop; + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(new String(property.value())); } }); } return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); } + private static String getPropertiesPrefix(int propertyId) { + return String.format(MQTT_PROPERTIES, propertyId); + } + public static List toMqttMessages(String topicName, Entry entry, PacketIdGenerator packetIdGenerator, MqttQoS qos) { ByteBuf metadataAndPayload = entry.getDataBuffer(); @@ -90,7 +111,29 @@ public static List toMqttMessages(String topicName, Entry en properties = new MqttProperties(); MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); for (KeyValue kv : metadata.getPropertiesList()) { - userProperties.add(kv.getKey(), kv.getValue()); + String key = kv.getKey(); + if (key.startsWith(MQTT_PROPERTIES_PREFIX)) { + List keys = Splitter.on("_").splitToList(key.substring(MQTT_PROPERTIES_PREFIX.length())); + int propertyId = Integer.parseInt(keys.get(0)); + MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyId); + switch (propertyType) { + case USER_PROPERTY: + userProperties.add(kv.getKey().substring(getPropertiesPrefix(propertyId).length()), + kv.getValue()); + break; + case RESPONSE_TOPIC: + case CONTENT_TYPE: + properties.add(new MqttProperties.StringProperty(propertyId, kv.getValue())); + break; + case CORRELATION_DATA: + properties.add(new MqttProperties.BinaryProperty(propertyId, kv.getValue() + .getBytes(StandardCharsets.UTF_8))); + break; + default: + log.warn("invalid propertyType: {}", propertyType); + break; + } + } } properties.add(userProperties); } @@ -153,8 +196,8 @@ public static ByteBuf messageToByteBuf(Message message) { metadata.setProducerName(FAKE_MQTT_PRODUCER_NAME); } - msg.getProperties().forEach((k, v) -> { - metadata.addProperty().setKey(k).setValue(v); + msg.getMessageBuilder().getPropertiesList().forEach(item -> { + metadata.addProperty().setKey(item.getKey()).setValue(item.getValue()); }); metadata.setCompression( diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index 0746ccb91..2ca97bdc2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -21,6 +21,8 @@ import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import lombok.extern.slf4j.Slf4j; import org.testng.Assert; import org.testng.annotations.Test; @@ -29,8 +31,8 @@ public class MQTT5PublishRelatedProtocolTest extends MQTTTestBase { @Test - public void testUserProperties() throws Exception { - final String topic = "testUserProperties"; + public void testPublishWithUserProperties() throws Exception { + final String topic = "testPublishWithUserProperties"; Mqtt5BlockingClient client1 = Mqtt5Client.builder() .identifier("abc") .serverHost("127.0.0.1") @@ -68,4 +70,94 @@ public void testUserProperties() throws Exception { client1.disconnect(); client2.disconnect(); } + + @Test + public void testPublishWithResponseTopic() throws Exception { + final String topic = "testPublishWithResponseTopic"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .responseTopic("response-topic-b") + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getResponseTopic().get()); + Assert.assertEquals(message.getResponseTopic().get().toString(), "response-topic-b"); + publishes.close(); + client1.disconnect(); + } + + @Test + public void testPublishWithContentType() throws Exception { + final String topic = "testPublishWithContentType"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .contentType("test-content-type") + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getContentType().get()); + Assert.assertEquals(message.getContentType().get().toString(), "test-content-type"); + publishes.close(); + client1.disconnect(); + } + + @Test + public void testPublishWithCorrelationData() throws Exception { + final String topic = "testPublishWithCorrelationData"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .correlationData("c-d".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + ByteBuffer byteBuffer = message.getCorrelationData().get(); + Assert.assertNotNull(byteBuffer); + byte[] bytes; + if (byteBuffer.hasArray()) { + bytes = byteBuffer.array(); + } else { + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } + String result = new String(bytes, StandardCharsets.UTF_8); + Assert.assertEquals(result, "c-d"); + publishes.close(); + client1.disconnect(); + } }