From b27dc21c7161bae7cc07982ff721da7455626370 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 16 Jun 2022 11:50:01 +0800 Subject: [PATCH] Support MQTT-5 will message with message expiry interval and delay interval. --- .../pulsar/handlers/mqtt/MQTTService.java | 1 + .../mqtt/support/WillMessageHandler.java | 20 +++++++++++++++++++ .../handlers/mqtt/utils/MqttMessageUtils.java | 15 +++++++++++++- .../handlers/mqtt/utils/WillMessage.java | 9 ++++++++- .../base/MQTT5ConnectRelatedProtocolTest.java | 11 ++++++++-- 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java index f3e79b488..5d6ce23e6 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java @@ -117,5 +117,6 @@ public void close() { if (eventService != null) { eventService.close(); } + this.willMessageHandler.close(); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/WillMessageHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/WillMessageHandler.java index eedaa39e6..eb75e13be 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/WillMessageHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/WillMessageHandler.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.mqtt.support.systemtopic.EventType.LAST_WILL_MESSAGE; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttWillMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.util.concurrent.DefaultThreadFactory; import io.streamnative.pulsar.handlers.mqtt.Connection; import io.streamnative.pulsar.handlers.mqtt.MQTTConnectionManager; import io.streamnative.pulsar.handlers.mqtt.MQTTService; @@ -25,6 +26,9 @@ import io.streamnative.pulsar.handlers.mqtt.support.systemtopic.MqttEvent; import io.streamnative.pulsar.handlers.mqtt.utils.WillMessage; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -42,6 +46,8 @@ public class WillMessageHandler { private final EventListener eventListener; private final MQTTService mqttService; + private final ScheduledExecutorService executor; + public WillMessageHandler(MQTTService mqttService) { this.mqttService = mqttService; this.pulsarService = mqttService.getPulsarService(); @@ -49,6 +55,8 @@ public WillMessageHandler(MQTTService mqttService) { this.connectionManager = mqttService.getConnectionManager(); this.advertisedAddress = mqttService.getPulsarService().getAdvertisedAddress(); this.eventListener = new LastWillMessageEventListener(); + this.executor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("will-message-executor")); } public void fireWillMessage(String clientId, WillMessage willMessage) { @@ -61,6 +69,14 @@ public void fireWillMessage(String clientId, WillMessage willMessage) { .build(); mqttService.getEventService().sendLWTEvent(lwt); } + if (willMessage.getDelayInterval() > 0) { + executor.schedule(() -> sendWillMessage(willMessage), willMessage.getDelayInterval(), TimeUnit.SECONDS); + } else { + sendWillMessage(willMessage); + } + } + + private void sendWillMessage(WillMessage willMessage) { List> subscriptions = mqttSubscriptionManager.findMatchedTopic(willMessage.getTopic()); MqttPublishMessage msg = createMqttWillMessage(willMessage); for (Pair entry : subscriptions) { @@ -73,6 +89,10 @@ public void fireWillMessage(String clientId, WillMessage willMessage) { } } + public void close() { + this.executor.shutdown(); + } + class LastWillMessageEventListener implements EventListener { @Override diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index bd9ed336c..2fa6faa7d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -114,8 +114,15 @@ public static WillMessage createWillMessage(MqttConnectMessage msg) { int payloadFormatIndicator = msg.payload().willProperties() .getProperties(MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value()) .stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0); + int messageExpiryInterval = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()) + .stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0); + int delayInterval = msg.payload().willProperties() + .getProperties(MqttProperties.MqttPropertyType.WILL_DELAY_INTERVAL.value()) + .stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0); return new WillMessage(willTopic, willMessage, qos, retained, userProperty, - contentType, responseTopic, correlationData, payloadFormatIndicator); + contentType, responseTopic, correlationData, payloadFormatIndicator, + messageExpiryInterval, delayInterval); } public static RetainedMessage createRetainedMessage(MqttPublishMessage msg) { @@ -165,6 +172,12 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.CORRELATION_DATA.value(), willMessage.correlationData.getBytes(StandardCharsets.UTF_8))); } + if (willMessage.messageExpiryInterval > 0) { + properties.add(new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), + willMessage.messageExpiryInterval)); + } + // No need to add delayInterval to the properties, it will cause client close the connection. builder.properties(properties); return builder.build(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java index 47492de45..4f5d27e33 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/WillMessage.java @@ -40,6 +40,9 @@ public class WillMessage { int payloadFormatIndicator; + int messageExpiryInterval; + + int delayInterval; public WillMessage() { } @@ -52,7 +55,9 @@ public WillMessage(final String topic, final String contentType, final String responseTopic, final String correlationData, - final int payloadFormatIndicator) { + final int payloadFormatIndicator, + final int messageExpiryInterval, + final int delayInterval) { this.topic = topic; this.willMessage = willMessage; this.qos = qos; @@ -62,5 +67,7 @@ public WillMessage(final String topic, this.responseTopic = responseTopic; this.correlationData = correlationData; this.payloadFormatIndicator = payloadFormatIndicator; + this.messageExpiryInterval = messageExpiryInterval; + this.delayInterval = delayInterval; } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java index f52abf449..1e6563952 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ConnectRelatedProtocolTest.java @@ -22,6 +22,8 @@ import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.testng.Assert; import org.testng.annotations.Test; @@ -59,13 +61,16 @@ public void testConnectWillMessage() throws Exception { .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .correlationData("cd".getBytes(StandardCharsets.UTF_8)) .responseTopic("will-response-topic") + .messageExpiryInterval(10) + .delayInterval(2) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.fromCode(1)) .applyWillPublish() .send(); client2.disconnect(); // - Mqtt5Publish message = publishes.receive(); - Assert.assertNotNull(message); + Optional optMessage = publishes.receive(5, TimeUnit.SECONDS); + Assert.assertTrue(optMessage.isPresent()); + Mqtt5Publish message = optMessage.get(); Assert.assertEquals(new String(message.getPayloadAsBytes()), "will-message"); // Validate the user properties order, must be the same with set order. ByteBuffer byteBuffer = message.getCorrelationData().get(); @@ -86,6 +91,8 @@ public void testConnectWillMessage() throws Exception { Assert.assertEquals(message.getContentType().get().toString(), "will-content-type"); Assert.assertNotNull(message.getPayloadFormatIndicator().get()); Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8); + Assert.assertNotNull(message.getMessageExpiryInterval()); + Assert.assertEquals(message.getMessageExpiryInterval().getAsLong(), 10); publishes.close(); client1.disconnect();