Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,6 @@ public void close() {
if (eventService != null) {
eventService.close();
}
this.willMessageHandler.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static io.streamnative.pulsar.handlers.mqtt.support.systemtopic.EventType.LAST_WILL_MESSAGE;
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttWillMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.pulsar.handlers.mqtt.Connection;
import io.streamnative.pulsar.handlers.mqtt.MQTTConnectionManager;
import io.streamnative.pulsar.handlers.mqtt.MQTTService;
Expand All @@ -25,6 +26,9 @@
import io.streamnative.pulsar.handlers.mqtt.support.systemtopic.MqttEvent;
import io.streamnative.pulsar.handlers.mqtt.utils.WillMessage;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,13 +46,17 @@ public class WillMessageHandler {
private final EventListener eventListener;
private final MQTTService mqttService;

private final ScheduledExecutorService executor;

public WillMessageHandler(MQTTService mqttService) {
this.mqttService = mqttService;
this.pulsarService = mqttService.getPulsarService();
this.mqttSubscriptionManager = mqttService.getSubscriptionManager();
this.connectionManager = mqttService.getConnectionManager();
this.advertisedAddress = mqttService.getPulsarService().getAdvertisedAddress();
this.eventListener = new LastWillMessageEventListener();
this.executor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("will-message-executor"));
}

public void fireWillMessage(String clientId, WillMessage willMessage) {
Expand All @@ -61,6 +69,14 @@ public void fireWillMessage(String clientId, WillMessage willMessage) {
.build();
mqttService.getEventService().sendLWTEvent(lwt);
}
if (willMessage.getDelayInterval() > 0) {
executor.schedule(() -> sendWillMessage(willMessage), willMessage.getDelayInterval(), TimeUnit.SECONDS);
} else {
sendWillMessage(willMessage);
}
}

private void sendWillMessage(WillMessage willMessage) {
List<Pair<String, String>> subscriptions = mqttSubscriptionManager.findMatchedTopic(willMessage.getTopic());
MqttPublishMessage msg = createMqttWillMessage(willMessage);
for (Pair<String, String> entry : subscriptions) {
Expand All @@ -73,6 +89,10 @@ public void fireWillMessage(String clientId, WillMessage willMessage) {
}
}

public void close() {
this.executor.shutdown();
}

class LastWillMessageEventListener implements EventListener {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,15 @@ public static WillMessage createWillMessage(MqttConnectMessage msg) {
int payloadFormatIndicator = msg.payload().willProperties()
.getProperties(MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value())
.stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0);
int messageExpiryInterval = msg.payload().willProperties()
.getProperties(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value())
.stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0);
int delayInterval = msg.payload().willProperties()
.getProperties(MqttProperties.MqttPropertyType.WILL_DELAY_INTERVAL.value())
.stream().map(up -> ((MqttProperties.IntegerProperty) up).value()).findFirst().orElse(0);
return new WillMessage(willTopic, willMessage, qos, retained, userProperty,
contentType, responseTopic, correlationData, payloadFormatIndicator);
contentType, responseTopic, correlationData, payloadFormatIndicator,
messageExpiryInterval, delayInterval);
}

public static RetainedMessage createRetainedMessage(MqttPublishMessage msg) {
Expand Down Expand Up @@ -165,6 +172,12 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage)
properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.CORRELATION_DATA.value(),
willMessage.correlationData.getBytes(StandardCharsets.UTF_8)));
}
if (willMessage.messageExpiryInterval > 0) {
properties.add(new MqttProperties.IntegerProperty(
MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(),
willMessage.messageExpiryInterval));
}
// No need to add delayInterval to the properties, it will cause client close the connection.
builder.properties(properties);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class WillMessage {

int payloadFormatIndicator;

int messageExpiryInterval;

int delayInterval;

public WillMessage() {
}
Expand All @@ -52,7 +55,9 @@ public WillMessage(final String topic,
final String contentType,
final String responseTopic,
final String correlationData,
final int payloadFormatIndicator) {
final int payloadFormatIndicator,
final int messageExpiryInterval,
final int delayInterval) {
this.topic = topic;
this.willMessage = willMessage;
this.qos = qos;
Expand All @@ -62,5 +67,7 @@ public WillMessage(final String topic,
this.responseTopic = responseTopic;
this.correlationData = correlationData;
this.payloadFormatIndicator = payloadFormatIndicator;
this.messageExpiryInterval = messageExpiryInterval;
this.delayInterval = delayInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -59,13 +61,16 @@ public void testConnectWillMessage() throws Exception {
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.correlationData("cd".getBytes(StandardCharsets.UTF_8))
.responseTopic("will-response-topic")
.messageExpiryInterval(10)
.delayInterval(2)
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.fromCode(1))
.applyWillPublish()
.send();
client2.disconnect();
//
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
Optional<Mqtt5Publish> optMessage = publishes.receive(5, TimeUnit.SECONDS);
Assert.assertTrue(optMessage.isPresent());
Mqtt5Publish message = optMessage.get();
Assert.assertEquals(new String(message.getPayloadAsBytes()), "will-message");
// Validate the user properties order, must be the same with set order.
ByteBuffer byteBuffer = message.getCorrelationData().get();
Expand All @@ -86,6 +91,8 @@ public void testConnectWillMessage() throws Exception {
Assert.assertEquals(message.getContentType().get().toString(), "will-content-type");
Assert.assertNotNull(message.getPayloadFormatIndicator().get());
Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8);
Assert.assertNotNull(message.getMessageExpiryInterval());
Assert.assertEquals(message.getMessageExpiryInterval().getAsLong(), 10);

publishes.close();
client1.disconnect();
Expand Down