From b66079aef0b8600a1aa19728dff10b890906cf1a Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 13 Jun 2022 21:27:04 +0800 Subject: [PATCH 1/5] Support MQTT-5 publish related properties. --- .../pulsar/handlers/mqtt/Constants.java | 11 ++ .../mqtt/utils/PulsarMessageConverter.java | 41 +++++- .../base/MQTT5PublishRelatedProtocolTest.java | 122 +++++++++++++++++- 3 files changed, 168 insertions(+), 6 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index b3d6e634f..77697d906 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.mqtt; +import io.netty.handler.codec.mqtt.MqttProperties; + /** * Server constants keeper. */ @@ -26,6 +28,15 @@ public final class Constants { public static final String ATTR_TOPIC_SUBS = "topicSubs"; + public static final String USER_PROPERTY = MqttProperties.MqttPropertyType.USER_PROPERTY.name() + "_"; + + public static final String RESPONSE_TOPIC = MqttProperties.MqttPropertyType.RESPONSE_TOPIC.name() + "_"; + + public static final String CONTENT_TYPE = MqttProperties.MqttPropertyType.CONTENT_TYPE.name() + "_"; + + public static final String CORRELATION_DATA = MqttProperties.MqttPropertyType.CORRELATION_DATA.name() + "_"; + + private Constants() { } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 5caefe44c..6e6c55f97 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -14,6 +14,10 @@ package io.streamnative.pulsar.handlers.mqtt.utils; import static com.google.common.base.Preconditions.checkArgument; +import static io.streamnative.pulsar.handlers.mqtt.Constants.CONTENT_TYPE; +import static io.streamnative.pulsar.handlers.mqtt.Constants.CORRELATION_DATA; +import static io.streamnative.pulsar.handlers.mqtt.Constants.RESPONSE_TOPIC; +import static io.streamnative.pulsar.handlers.mqtt.Constants.USER_PROPERTY; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -22,6 +26,7 @@ import io.streamnative.pulsar.handlers.mqtt.PacketIdGenerator; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -73,9 +78,22 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) prop; userProperties.value().forEach(pair -> { - metadata.addProperty().setKey(pair.key).setValue(pair.value); + metadata.addProperty().setKey(USER_PROPERTY + pair.key).setValue(pair.value); }); } + if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) { + MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; + metadata.addProperty().setKey(RESPONSE_TOPIC + property.propertyId()).setValue(property.value()); + } + if (MqttProperties.MqttPropertyType.CONTENT_TYPE.value() == prop.propertyId()) { + MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; + metadata.addProperty().setKey(CONTENT_TYPE + property.propertyId()).setValue(property.value()); + } + if (MqttProperties.MqttPropertyType.CORRELATION_DATA.value() == prop.propertyId()) { + MqttProperties.BinaryProperty property = (MqttProperties.BinaryProperty) prop; + metadata.addProperty().setKey(CORRELATION_DATA + property.propertyId()) + .setValue(new String(property.value())); + } }); } return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); @@ -90,7 +108,22 @@ public static List toMqttMessages(String topicName, Entry en properties = new MqttProperties(); MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); for (KeyValue kv : metadata.getPropertiesList()) { - userProperties.add(kv.getKey(), kv.getValue()); + if (kv.getKey().startsWith(USER_PROPERTY)) { + userProperties.add(kv.getKey().substring(USER_PROPERTY.length()), kv.getValue()); + } + if (kv.getKey().startsWith(RESPONSE_TOPIC)) { + properties.add(new MqttProperties.StringProperty(Integer.parseInt(kv.getKey() + .substring(RESPONSE_TOPIC.length())), kv.getValue())); + } + if (kv.getKey().startsWith(CONTENT_TYPE)) { + properties.add(new MqttProperties.StringProperty(Integer.parseInt(kv.getKey() + .substring(CONTENT_TYPE.length())), kv.getValue())); + } + if (kv.getKey().startsWith(CORRELATION_DATA)) { + properties.add(new MqttProperties.BinaryProperty(Integer.parseInt(kv.getKey() + .substring(CORRELATION_DATA.length())), kv.getValue().getBytes(StandardCharsets.UTF_8))); + } + } properties.add(userProperties); } @@ -153,8 +186,8 @@ public static ByteBuf messageToByteBuf(Message message) { metadata.setProducerName(FAKE_MQTT_PRODUCER_NAME); } - msg.getProperties().forEach((k, v) -> { - metadata.addProperty().setKey(k).setValue(v); + msg.getMessageBuilder().getPropertiesList().forEach(item -> { + metadata.addProperty().setKey(item.getKey()).setValue(item.getValue()); }); metadata.setCompression( diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index 0746ccb91..159a8b917 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -21,6 +21,8 @@ import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import lombok.extern.slf4j.Slf4j; import org.testng.Assert; import org.testng.annotations.Test; @@ -29,8 +31,8 @@ public class MQTT5PublishRelatedProtocolTest extends MQTTTestBase { @Test - public void testUserProperties() throws Exception { - final String topic = "testUserProperties"; + public void testPublishWithUserProperties() throws Exception { + final String topic = "testPublishWithUserProperties"; Mqtt5BlockingClient client1 = Mqtt5Client.builder() .identifier("abc") .serverHost("127.0.0.1") @@ -68,4 +70,120 @@ public void testUserProperties() throws Exception { client1.disconnect(); client2.disconnect(); } + + @Test + public void testPublishWithResponseTopic() throws Exception { + final String topic = "testPublishWithResponseTopic"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .responseTopic("response-topic-b") + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getResponseTopic().get()); + Assert.assertEquals(message.getResponseTopic().get().toString(), "response-topic-b"); + publishes.close(); + client1.disconnect(); + } + + @Test + public void testPublishWithContentType() throws Exception { + final String topic = "testPublishWithContentType"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .contentType("test-content-type") + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getContentType().get()); + Assert.assertEquals(message.getContentType().get().toString(), "test-content-type"); + publishes.close(); + client1.disconnect(); + } + + @Test + public void testPublishWithCorrelationData() throws Exception { + final String topic = "testPublishWithCorrelationData"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .correlationData("c-d".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + ByteBuffer byteBuffer = message.getCorrelationData().get(); + Assert.assertNotNull(byteBuffer); + byte[] bytes; + if (byteBuffer.hasArray()) { + bytes = byteBuffer.array(); + } else { + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } + String result = new String(bytes, StandardCharsets.UTF_8); + Assert.assertEquals(result, "c-d"); + publishes.close(); + client1.disconnect(); + } + + @Test + public void testPublishWithSubscriptionIdentifier() throws Exception { + final String topic = "testPublishWithSubscriptionIdentifier"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + // Validate the user properties order, must be the same with set order. + Assert.assertNotNull(message.getContentType().get()); + Assert.assertEquals(message.getContentType().get().toString(), "test-content-type"); + publishes.close(); + client1.disconnect(); + } } From e19471c1833ce600d4ec28ba7e05fa995b0b88b1 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 14 Jun 2022 09:53:49 +0800 Subject: [PATCH 2/5] remove test. --- .../base/MQTT5PublishRelatedProtocolTest.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index 159a8b917..2ca97bdc2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -160,30 +160,4 @@ public void testPublishWithCorrelationData() throws Exception { publishes.close(); client1.disconnect(); } - - @Test - public void testPublishWithSubscriptionIdentifier() throws Exception { - final String topic = "testPublishWithSubscriptionIdentifier"; - Mqtt5BlockingClient client1 = Mqtt5Client.builder() - .identifier("abc") - .serverHost("127.0.0.1") - .serverPort(getMqttBrokerPortList().get(0)) - .buildBlocking(); - client1.connectWith().send(); - Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) - .qos(MqttQos.AT_LEAST_ONCE).build(); - client1.subscribeWith() - .topicFilter(topic) - .qos(MqttQos.AT_LEAST_ONCE) - .send(); - Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); - client1.publish(publishMessage); - Mqtt5Publish message = publishes.receive(); - Assert.assertNotNull(message); - // Validate the user properties order, must be the same with set order. - Assert.assertNotNull(message.getContentType().get()); - Assert.assertEquals(message.getContentType().get().toString(), "test-content-type"); - publishes.close(); - client1.disconnect(); - } } From c7284615e38a78741b398b1d1549e4078615131f Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 14 Jun 2022 16:00:05 +0800 Subject: [PATCH 3/5] apply comment. --- .../pulsar/handlers/mqtt/Constants.java | 11 +-- .../mqtt/utils/PulsarMessageConverter.java | 67 +++++++++++-------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index 77697d906..77b707c9f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -13,8 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt; -import io.netty.handler.codec.mqtt.MqttProperties; - /** * Server constants keeper. */ @@ -28,14 +26,9 @@ public final class Constants { public static final String ATTR_TOPIC_SUBS = "topicSubs"; - public static final String USER_PROPERTY = MqttProperties.MqttPropertyType.USER_PROPERTY.name() + "_"; - - public static final String RESPONSE_TOPIC = MqttProperties.MqttPropertyType.RESPONSE_TOPIC.name() + "_"; - - public static final String CONTENT_TYPE = MqttProperties.MqttPropertyType.CONTENT_TYPE.name() + "_"; - - public static final String CORRELATION_DATA = MqttProperties.MqttPropertyType.CORRELATION_DATA.name() + "_"; + public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_"; + public static final String MQTT_PROPERTIES_PREFIX = "MQTT_PROPERTIES_"; private Constants() { } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 6e6c55f97..5bd2046d2 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -14,10 +14,9 @@ package io.streamnative.pulsar.handlers.mqtt.utils; import static com.google.common.base.Preconditions.checkArgument; -import static io.streamnative.pulsar.handlers.mqtt.Constants.CONTENT_TYPE; -import static io.streamnative.pulsar.handlers.mqtt.Constants.CORRELATION_DATA; -import static io.streamnative.pulsar.handlers.mqtt.Constants.RESPONSE_TOPIC; -import static io.streamnative.pulsar.handlers.mqtt.Constants.USER_PROPERTY; +import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES; +import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES_PREFIX; +import com.google.common.base.Splitter; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -78,20 +77,20 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) { MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) prop; userProperties.value().forEach(pair -> { - metadata.addProperty().setKey(USER_PROPERTY + pair.key).setValue(pair.value); + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key) + .setValue(pair.value); }); - } - if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) { + } else if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) { MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; - metadata.addProperty().setKey(RESPONSE_TOPIC + property.propertyId()).setValue(property.value()); - } - if (MqttProperties.MqttPropertyType.CONTENT_TYPE.value() == prop.propertyId()) { + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(property.value()); + } else if (MqttProperties.MqttPropertyType.CONTENT_TYPE.value() == prop.propertyId()) { MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; - metadata.addProperty().setKey(CONTENT_TYPE + property.propertyId()).setValue(property.value()); - } - if (MqttProperties.MqttPropertyType.CORRELATION_DATA.value() == prop.propertyId()) { + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(property.value()); + } else if (MqttProperties.MqttPropertyType.CORRELATION_DATA.value() == prop.propertyId()) { MqttProperties.BinaryProperty property = (MqttProperties.BinaryProperty) prop; - metadata.addProperty().setKey(CORRELATION_DATA + property.propertyId()) + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) .setValue(new String(property.value())); } }); @@ -99,6 +98,10 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName()); } + private static String getPropertiesPrefix(int propertyId) { + return String.format(MQTT_PROPERTIES, propertyId); + } + public static List toMqttMessages(String topicName, Entry entry, PacketIdGenerator packetIdGenerator, MqttQoS qos) { ByteBuf metadataAndPayload = entry.getDataBuffer(); @@ -108,22 +111,28 @@ public static List toMqttMessages(String topicName, Entry en properties = new MqttProperties(); MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); for (KeyValue kv : metadata.getPropertiesList()) { - if (kv.getKey().startsWith(USER_PROPERTY)) { - userProperties.add(kv.getKey().substring(USER_PROPERTY.length()), kv.getValue()); + String key = kv.getKey(); + if (key.startsWith(MQTT_PROPERTIES_PREFIX)) { + List keys = Splitter.on("_").splitToList(key.substring(MQTT_PROPERTIES_PREFIX.length())); + int propertyId = Integer.parseInt(keys.get(0)); + MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyId); + switch (propertyType) { + case USER_PROPERTY: + userProperties.add(kv.getKey().substring(getPropertiesPrefix(propertyId).length()), + kv.getValue()); + break; + case RESPONSE_TOPIC: + case CONTENT_TYPE: + properties.add(new MqttProperties.StringProperty(propertyId, kv.getValue())); + break; + case CORRELATION_DATA: + properties.add(new MqttProperties.BinaryProperty(propertyId, kv.getValue() + .getBytes(StandardCharsets.UTF_8))); + break; + default: + throw new RuntimeException("invalid propertyType : " + propertyType); + } } - if (kv.getKey().startsWith(RESPONSE_TOPIC)) { - properties.add(new MqttProperties.StringProperty(Integer.parseInt(kv.getKey() - .substring(RESPONSE_TOPIC.length())), kv.getValue())); - } - if (kv.getKey().startsWith(CONTENT_TYPE)) { - properties.add(new MqttProperties.StringProperty(Integer.parseInt(kv.getKey() - .substring(CONTENT_TYPE.length())), kv.getValue())); - } - if (kv.getKey().startsWith(CORRELATION_DATA)) { - properties.add(new MqttProperties.BinaryProperty(Integer.parseInt(kv.getKey() - .substring(CORRELATION_DATA.length())), kv.getValue().getBytes(StandardCharsets.UTF_8))); - } - } properties.add(userProperties); } From 1e745d29ab151fa71b21f99ee6530da05ce9d312 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 14 Jun 2022 16:05:37 +0800 Subject: [PATCH 4/5] apply comment. --- .../pulsar/handlers/mqtt/utils/PulsarMessageConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 5bd2046d2..6e807bd3b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -130,7 +130,7 @@ public static List toMqttMessages(String topicName, Entry en .getBytes(StandardCharsets.UTF_8))); break; default: - throw new RuntimeException("invalid propertyType : " + propertyType); + log.warn("invalid propertyType: {}", propertyType); } } } From 2b88c8d53228d3ea9e684d8b32512ac0c6263764 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 14 Jun 2022 17:15:52 +0800 Subject: [PATCH 5/5] apply comment. --- .../pulsar/handlers/mqtt/utils/PulsarMessageConverter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index 6e807bd3b..50986680a 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -131,6 +131,7 @@ public static List toMqttMessages(String topicName, Entry en break; default: log.warn("invalid propertyType: {}", propertyType); + break; } } }