Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ public void updateExpireInterval(int newExpireInterval) throws InvalidSessionExp
}

public boolean isSessionExpireImmediately() {
return sessionExpireInterval == SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime();
return sessionExpireInterval == null
|| sessionExpireInterval == SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime();
}

public boolean isSessionNeverExpire() {
return sessionExpireInterval == SessionExpireInterval.NEVER_EXPIRE.getSecondTime();
return sessionExpireInterval == null
|| sessionExpireInterval == SessionExpireInterval.NEVER_EXPIRE.getSecondTime();
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.mqtt.support;

import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getMessageExpiryInterval;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
Expand Down Expand Up @@ -103,9 +104,22 @@ public ChannelPromise sendMessages(List<Entry> entries, EntryBatchSizes batchSiz
outstandingPacketContainer.add(outstandingPacket);
}
} else {
OutstandingPacket outstandingPacket = new OutstandingPacket(this,
messages.get(0).variableHeader().packetId(), entry.getLedgerId(), entry.getEntryId());
outstandingPacketContainer.add(outstandingPacket);
// Because batch msg is sent from Pulsar client, so only individual msg may have mqtt-5 properties.
MqttPublishMessage firstMessage = messages.get(0);
long expiryInterval = getMessageExpiryInterval(firstMessage);
boolean addToOutstandingPacketContainer = expiryInterval >= 0;
if (expiryInterval < 0) {
log.warn("mqtt msg has expired : {}", firstMessage);
messages.remove(0);
getSubscription().acknowledgeMessage(
Collections.singletonList(entry.getPosition()),
CommandAck.AckType.Individual, Collections.emptyMap());
}
if (addToOutstandingPacketContainer) {
OutstandingPacket outstandingPacket = new OutstandingPacket(this,
messages.get(0).variableHeader().packetId(), entry.getLedgerId(), entry.getEntryId());
outstandingPacketContainer.add(outstandingPacket);
}
}
}
for (MqttPublishMessage msg : messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage)
public static MqttMessage createMqttDisconnectMessage() {
return MessageBuilder.disconnect().build();
}

public static long getMessageExpiryInterval(MqttPublishMessage msg) {
return msg.variableHeader().properties().getProperties(
MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value())
.stream().map(prop -> ((MqttProperties.IntegerProperty) prop).value()).findFirst().orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES;
import static io.streamnative.pulsar.handlers.mqtt.Constants.MQTT_PROPERTIES_PREFIX;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
Expand Down Expand Up @@ -96,6 +97,10 @@ public static MessageImpl<byte[]> toPulsarMsg(Topic topic, MqttPublishMessage mq
MqttProperties.IntegerProperty property = (MqttProperties.IntegerProperty) prop;
metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()))
.setValue(String.valueOf(property.value()));
} else if (MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value() == prop.propertyId()) {
MqttProperties.IntegerProperty property = (MqttProperties.IntegerProperty) prop;
metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()))
.setValue(String.valueOf(System.currentTimeMillis() / 1000 + property.value()));
}
});
}
Expand Down Expand Up @@ -137,6 +142,11 @@ public static List<MqttPublishMessage> toMqttMessages(String topicName, Entry en
properties.add(new MqttProperties.BinaryProperty(propertyId, kv.getValue()
.getBytes(StandardCharsets.UTF_8)));
break;
case PUBLICATION_EXPIRY_INTERVAL:
// calculate first to avoid reset msg properties.
int end = Integer.valueOf(kv.getValue()) - (int) (System.currentTimeMillis() / 1000);
properties.add(new MqttProperties.IntegerProperty(propertyId, end));
break;
default:
log.warn("invalid propertyType: {}", propertyType);
break;
Expand Down Expand Up @@ -171,7 +181,7 @@ public static List<MqttPublishMessage> toMqttMessages(String topicName, Entry en
return Collections.emptyList();
}
} else {
return Collections.singletonList(MessageBuilder.publish()
return Lists.newArrayList(MessageBuilder.publish()
.messageId(packetIdGenerator.nextPacketId())
.payload(metadataAndPayload)
.topicName(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -93,7 +95,6 @@ public void testPublishWithResponseTopic() throws Exception {
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
// Validate the user properties order, must be the same with set order.
Assert.assertNotNull(message.getResponseTopic().get());
Assert.assertEquals(message.getResponseTopic().get().toString(), "response-topic-b");
publishes.close();
Expand All @@ -120,7 +121,6 @@ public void testPublishWithContentType() throws Exception {
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
// Validate the user properties order, must be the same with set order.
Assert.assertNotNull(message.getContentType().get());
Assert.assertEquals(message.getContentType().get().toString(), "test-content-type");
publishes.close();
Expand All @@ -147,7 +147,6 @@ public void testPublishWithCorrelationData() throws Exception {
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
// Validate the user properties order, must be the same with set order.
ByteBuffer byteBuffer = message.getCorrelationData().get();
Assert.assertNotNull(byteBuffer);
byte[] bytes;
Expand Down Expand Up @@ -183,10 +182,71 @@ public void testPublishWithPayloadFormatIndicator() throws Exception {
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
// Validate the user properties order, must be the same with set order.
Assert.assertNotNull(message.getPayloadFormatIndicator().get());
Assert.assertEquals(message.getPayloadFormatIndicator().get(), Mqtt5PayloadFormatIndicator.UTF_8);
publishes.close();
client1.disconnect();
}

@Test
public void testPublishWithMessageExpiryInterval() throws Exception {
final String topic = "testPublishWithMessageExpiryInterval";
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier("abc")
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
client1.connectWith().send();
Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic)
.messageExpiryInterval(10)
.qos(MqttQos.AT_LEAST_ONCE).build();
client1.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL);
client1.publish(publishMessage);
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
long expiryInterval = message.getMessageExpiryInterval().getAsLong();
Assert.assertTrue(expiryInterval > 0 && expiryInterval <= 10);
publishes.close();
client1.disconnect();
//
final String topic2 = "testPublishWithMessageExpiryInterval2";
Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.identifier("abc2")
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
client2.connectWith()
.cleanStart(false).send();
Mqtt5Publish publishMessage2 = Mqtt5Publish.builder().topic(topic2)
.messageExpiryInterval(1)
.qos(MqttQos.AT_LEAST_ONCE).build();

client2.subscribeWith()
.topicFilter(topic2)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
Mqtt5BlockingClient.Mqtt5Publishes publishes2 = client2.publishes(MqttGlobalPublishFilter.ALL, true);
client2.publish(publishMessage2);
Optional<Mqtt5Publish> message2 = publishes2.receive(2, TimeUnit.SECONDS);
Assert.assertTrue(message2.isPresent());
publishes2.close();
client2.disconnect();
// Wait the msg to be expired.
Thread.sleep(2000);
long msgBacklog = admin.topics().getStats(topic2).getSubscriptions().get("abc2").getMsgBacklog();
Assert.assertEquals(msgBacklog, 1);
client2.connectWith().send();
client2.subscribeWith()
.topicFilter(topic2)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
publishes2 = client2.publishes(MqttGlobalPublishFilter.ALL, true);
message2 = publishes2.receive(2, TimeUnit.SECONDS);
Assert.assertFalse(message2.isPresent());
client2.disconnect();
}
}