Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static class PublishBuilder {
private MqttQoS qos;
private ByteBuf payload;
private int messageId;
private MqttProperties properties;

public PublishBuilder topicName(String topic) {
this.topic = topic;
Expand All @@ -76,9 +77,14 @@ public PublishBuilder messageId(int messageId) {
return this;
}

public PublishBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}

public MqttPublishMessage build() {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId);
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, payload);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
Expand All @@ -31,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Hex;

/**
Expand Down Expand Up @@ -96,7 +98,10 @@ public static WillMessage createWillMessage(MqttConnectMessage msg) {
final String willTopic = msg.payload().willTopic();
final boolean retained = msg.variableHeader().isWillRetain();
final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos());
return new WillMessage(willTopic, willMessage, qos, retained);
final List<MqttProperties.StringPair> userProperty = msg.payload().willProperties()
.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())
.stream().map(up -> (MqttProperties.StringPair) up.value()).collect(Collectors.toList());
return new WillMessage(willTopic, willMessage, qos, retained, userProperty);
}

public static RetainedMessage createRetainedMessage(MqttPublishMessage msg) {
Expand All @@ -122,13 +127,18 @@ public static MqttPublishMessage createRetainedMessage(RetainedMessage msg) {
}

public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) {
return MessageBuilder.publish()
MessageBuilder.PublishBuilder builder = MessageBuilder.publish()
.topicName(willMessage.getTopic())
.payload(Unpooled.copiedBuffer(willMessage.getWillMessage()))
.qos(willMessage.getQos())
.retained(willMessage.isRetained())
.messageId(-1)
.build();
.messageId(-1);
if (willMessage.userProperty != null) {
MqttProperties properties = new MqttProperties();
properties.add(new MqttProperties.UserProperties(willMessage.userProperty));
builder.properties(properties);
}
return builder.build();
}

public static MqttMessage createMqttDisconnectMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.streamnative.pulsar.handlers.mqtt.utils;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -28,14 +30,20 @@ public class WillMessage {
byte[] willMessage;
MqttQoS qos;
boolean retained;
List<MqttProperties.StringPair> userProperty;

public WillMessage() {
}

public WillMessage(final String topic, final byte[] willMessage, final MqttQoS qos, final boolean retained) {
public WillMessage(final String topic,
final byte[] willMessage,
final MqttQoS qos,
final boolean retained,
final List<MqttProperties.StringPair> userProperty) {
this.topic = topic;
this.willMessage = willMessage;
this.qos = qos;
this.retained = retained;
this.userProperty = userProperty;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
Expand Down Expand Up @@ -174,4 +176,46 @@ public void testMaximumPacketSize() throws Exception {
client.unsubscribeWith().topicFilter(topic).send();
client.disconnect();
}

@Test
public void testLastWillMessage() throws Exception {
final String topic = "testLastWillMessage";
final String identifier = "test-Last-Will-Message";
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(identifier)
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
Mqtt5UserProperties userProperty = Mqtt5UserProperties.builder()
.add("user-1", "value-1")
.add("user-2", "value-2")
.build();
Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1");
Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2");
client.connectWith().willPublish(Mqtt5Publish.builder().topic(topic).userProperties(userProperty)
.asWill().payload("online".getBytes(StandardCharsets.UTF_8)).build())
.send();
Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.identifier(identifier + "-client-2")
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.buildBlocking();
client2.connectWith().send();
client2.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL);
client.disconnect();
Mqtt5Publish message = publishes.receive();
Assert.assertNotNull(message);
Assert.assertEquals(message.getPayloadAsBytes(), "online".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(message.getUserProperties().asList().size(), 2);
// Validate the user properties order, must be the same with set order.
Assert.assertEquals(message.getUserProperties().asList().get(0).compareTo(userProperty1), 0);
Assert.assertEquals(message.getUserProperties().asList().get(1).compareTo(userProperty2), 0);
publishes.close();
client2.unsubscribeWith().topicFilter(topic).send();
client2.disconnect();
}
}