From 264a4744bcfc5d195f8d7c00ea5ed333a54cc1e1 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 16 Jun 2022 09:44:08 +0800 Subject: [PATCH 1/2] Support MQTT5-5 publish message expiry interval. --- .../mqtt/restrictions/ClientRestrictions.java | 6 +- .../handlers/mqtt/support/MQTTConsumer.java | 20 +++++- .../handlers/mqtt/utils/MqttMessageUtils.java | 6 ++ .../mqtt/utils/PulsarMessageConverter.java | 12 +++- .../base/MQTT5PublishRelatedProtocolTest.java | 68 +++++++++++++++++-- 5 files changed, 102 insertions(+), 10 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java index c3f4fe982..41df5c7fb 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java @@ -71,11 +71,13 @@ public void updateExpireInterval(int newExpireInterval) throws InvalidSessionExp } public boolean isSessionExpireImmediately() { - return sessionExpireInterval == SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime(); + return sessionExpireInterval == null + || sessionExpireInterval == SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime(); } public boolean isSessionNeverExpire() { - return sessionExpireInterval == SessionExpireInterval.NEVER_EXPIRE.getSecondTime(); + return sessionExpireInterval == null + || sessionExpireInterval == SessionExpireInterval.NEVER_EXPIRE.getSecondTime(); } @Getter diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java index c491d0627..12cc79de0 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.mqtt.support; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getMessageExpiryInterval; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -103,9 +104,22 @@ public ChannelPromise sendMessages(List entries, EntryBatchSizes batchSiz outstandingPacketContainer.add(outstandingPacket); } } else { - OutstandingPacket outstandingPacket = new OutstandingPacket(this, - messages.get(0).variableHeader().packetId(), entry.getLedgerId(), entry.getEntryId()); - outstandingPacketContainer.add(outstandingPacket); + // Because batch msg is sent from Pulsar client, so only individual msg may have mqtt-5 properties. + MqttPublishMessage firstMessage = messages.get(0); + long messageExpiry = getMessageExpiryInterval(firstMessage); + boolean addToOutstandingPacketContainer = messageExpiry >= 0; + if (messageExpiry < 0) { + log.warn("mqtt msg has expired : {}", firstMessage); + messages.remove(0); + getSubscription().acknowledgeMessage( + Collections.singletonList(entry.getPosition()), + CommandAck.AckType.Individual, Collections.emptyMap()); + } + if (addToOutstandingPacketContainer) { + OutstandingPacket outstandingPacket = new OutstandingPacket(this, + messages.get(0).variableHeader().packetId(), entry.getLedgerId(), entry.getEntryId()); + outstandingPacketContainer.add(outstandingPacket); + } } } for (MqttPublishMessage msg : messages) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index 16744c359..bd9ed336c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -172,4 +172,10 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) public static MqttMessage createMqttDisconnectMessage() { return MessageBuilder.disconnect().build(); } + + public static long getMessageExpiryInterval(MqttPublishMessage msg) { + return msg.variableHeader().properties().getProperties( + MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()) + .stream().map(prop -> ((MqttProperties.IntegerProperty) prop).value()).findFirst().orElse(0); + } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java index dcc60f001..22d26fbd4 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java @@ -17,6 +17,7 @@ import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES; import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES_PREFIX; import com.google.common.base.Splitter; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -96,6 +97,10 @@ public static MessageImpl toPulsarMsg(Topic topic, MqttPublishMessage mq MqttProperties.IntegerProperty property = (MqttProperties.IntegerProperty) prop; metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) .setValue(String.valueOf(property.value())); + } else if (MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value() == prop.propertyId()) { + MqttProperties.IntegerProperty property = (MqttProperties.IntegerProperty) prop; + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId())) + .setValue(String.valueOf(System.currentTimeMillis() / 1000 + property.value())); } }); } @@ -137,6 +142,11 @@ public static List toMqttMessages(String topicName, Entry en properties.add(new MqttProperties.BinaryProperty(propertyId, kv.getValue() .getBytes(StandardCharsets.UTF_8))); break; + case PUBLICATION_EXPIRY_INTERVAL: + // calculate first to avoid reset msg properties. + int end = Integer.valueOf(kv.getValue()) - (int) (System.currentTimeMillis() / 1000); + properties.add(new MqttProperties.IntegerProperty(propertyId, end)); + break; default: log.warn("invalid propertyType: {}", propertyType); break; @@ -171,7 +181,7 @@ public static List toMqttMessages(String topicName, Entry en return Collections.emptyList(); } } else { - return Collections.singletonList(MessageBuilder.publish() + return Lists.newArrayList(MessageBuilder.publish() .messageId(packetIdGenerator.nextPacketId()) .payload(metadataAndPayload) .topicName(topicName) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java index ef2fa2d44..9881428c0 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5PublishRelatedProtocolTest.java @@ -24,6 +24,8 @@ import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.testng.Assert; import org.testng.annotations.Test; @@ -93,7 +95,6 @@ public void testPublishWithResponseTopic() throws Exception { client1.publish(publishMessage); Mqtt5Publish message = publishes.receive(); Assert.assertNotNull(message); - // Validate the user properties order, must be the same with set order. Assert.assertNotNull(message.getResponseTopic().get()); Assert.assertEquals(message.getResponseTopic().get().toString(), "response-topic-b"); publishes.close(); @@ -120,7 +121,6 @@ public void testPublishWithContentType() throws Exception { client1.publish(publishMessage); Mqtt5Publish message = publishes.receive(); Assert.assertNotNull(message); - // Validate the user properties order, must be the same with set order. Assert.assertNotNull(message.getContentType().get()); Assert.assertEquals(message.getContentType().get().toString(), "test-content-type"); publishes.close(); @@ -147,7 +147,6 @@ public void testPublishWithCorrelationData() throws Exception { client1.publish(publishMessage); Mqtt5Publish message = publishes.receive(); Assert.assertNotNull(message); - // Validate the user properties order, must be the same with set order. ByteBuffer byteBuffer = message.getCorrelationData().get(); Assert.assertNotNull(byteBuffer); byte[] bytes; @@ -183,10 +182,71 @@ public void testPublishWithPayloadFormatIndicator() throws Exception { client1.publish(publishMessage); Mqtt5Publish message = publishes.receive(); Assert.assertNotNull(message); - // Validate the user properties order, must be the same with set order. Assert.assertNotNull(message.getPayloadFormatIndicator().get()); Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8); publishes.close(); client1.disconnect(); } + + @Test + public void testPublishWithMessageExpiryInterval() throws Exception { + final String topic = "testPublishWithMessageExpiryInterval"; + Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("abc") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client1.connectWith().send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic) + .messageExpiryInterval(10) + .qos(MqttQos.AT_LEAST_ONCE).build(); + client1.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + long expiryInterval = message.getMessageExpiryInterval().getAsLong(); + Assert.assertTrue(expiryInterval > 0 && expiryInterval <= 10); + publishes.close(); + client1.disconnect(); + // + final String topic2 = "testPublishWithMessageExpiryInterval2"; + Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier("abc2") + .serverHost("127.0.0.1") + .serverPort(getMqttBrokerPortList().get(0)) + .buildBlocking(); + client2.connectWith() + .cleanStart(false).send(); + Mqtt5Publish publishMessage2 = Mqtt5Publish.builder().topic(topic2) + .messageExpiryInterval(1) + .qos(MqttQos.AT_LEAST_ONCE).build(); + + client2.subscribeWith() + .topicFilter(topic2) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes2 = client2.publishes(MqttGlobalPublishFilter.ALL, true); + client2.publish(publishMessage2); + Optional message2 = publishes2.receive(2, TimeUnit.SECONDS); + Assert.assertTrue(message2.isPresent()); + publishes2.close(); + client2.disconnect(); + // Wait the msg to be expired. + Thread.sleep(2000); + long msgBacklog = admin.topics().getStats(topic2).getSubscriptions().get("abc2").getMsgBacklog(); + Assert.assertEquals(msgBacklog, 1); + client2.connectWith().send(); + client2.subscribeWith() + .topicFilter(topic2) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + publishes2 = client2.publishes(MqttGlobalPublishFilter.ALL, true); + message2 = publishes2.receive(2, TimeUnit.SECONDS); + Assert.assertFalse(message2.isPresent()); + client2.disconnect(); + } } From 9dffe1b51b8e550bde79cab5806289b19f796be7 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 16 Jun 2022 09:49:56 +0800 Subject: [PATCH 2/2] change field name. --- .../pulsar/handlers/mqtt/support/MQTTConsumer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java index 12cc79de0..6105dbcf5 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTConsumer.java @@ -106,9 +106,9 @@ public ChannelPromise sendMessages(List entries, EntryBatchSizes batchSiz } else { // Because batch msg is sent from Pulsar client, so only individual msg may have mqtt-5 properties. MqttPublishMessage firstMessage = messages.get(0); - long messageExpiry = getMessageExpiryInterval(firstMessage); - boolean addToOutstandingPacketContainer = messageExpiry >= 0; - if (messageExpiry < 0) { + long expiryInterval = getMessageExpiryInterval(firstMessage); + boolean addToOutstandingPacketContainer = expiryInterval >= 0; + if (expiryInterval < 0) { log.warn("mqtt msg has expired : {}", firstMessage); messages.remove(0); getSubscription().acknowledgeMessage(