diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index 6b2d7a55e..7edde08c1 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -238,6 +238,7 @@ public void sendConnAck() { } MqttConnectAck.MqttConnectSuccessAckBuilder builder = MqttConnectAck.successBuilder(protocolVersion) .receiveMaximum(getServerRestrictions().getReceiveMaximum()) + .topicAliasMaximum(clientRestrictions.getTopicAliasMaximum()) .cleanSession(clientRestrictions.isCleanSession()) .maximumQos(MqttQoS.AT_LEAST_ONCE.value()) .maximumPacketSize(getServerRestrictions().getMaximumPacketSize()); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java index ce6397717..4915f706e 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/MqttPropertyUtils.java @@ -81,6 +81,9 @@ public static void parsePropertiesToStuffRestriction( getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION); // the empty option means allowing reason string or user property clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent()); + Optional topicAliasMaximum = + getIntProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM); + topicAliasMaximum.ifPresent(clientRestrictionsBuilder::topicAliasMaximum); } /** diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java index 8f251597d..0c1e82b5d 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttConnectAck.java @@ -45,6 +45,8 @@ public final static class MqttConnectSuccessAckBuilder { private boolean cleanSession; private int receiveMaximum; + private int topicAliasMaximum; + private int maximumQos; private String responseInformation; @@ -60,6 +62,12 @@ public MqttConnectSuccessAckBuilder cleanSession(boolean cleanSession) { return this; } + public MqttConnectSuccessAckBuilder topicAliasMaximum(int topicAliasMaximum) { + this.topicAliasMaximum = topicAliasMaximum; + return this; + } + + public MqttConnectSuccessAckBuilder receiveMaximum(int receiveMaximum) { this.receiveMaximum = receiveMaximum; return this; @@ -102,10 +110,14 @@ public MqttAck build() { MqttProperties.IntegerProperty maximumPacketSizeProperty = new MqttProperties.IntegerProperty( MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value(), maximumPacketSize); + MqttProperties.IntegerProperty topicAliasProperty = + new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM.value(), + topicAliasMaximum); properties.add(receiveMaximumProperty); properties.add(maximumQosProperty); properties.add(subscriptionIdentifiersAvailableProperty); properties.add(maximumPacketSizeProperty); + properties.add(topicAliasProperty); if (StringUtils.isNotEmpty(responseInformation)) { MqttProperties.StringProperty responseInformationProperty = new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(), @@ -199,7 +211,6 @@ enum ErrorReason { ), QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), - WILL_QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED), SERVER_UNAVAILABLE(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java index 41df5c7fb..5ef8558b1 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/restrictions/ClientRestrictions.java @@ -35,6 +35,7 @@ public class ClientRestrictions { private boolean cleanSession; private Integer maximumPacketSize; + private Integer topicAliasMaximum; @Getter private boolean allowReasonStrOrUserProperty; @@ -59,6 +60,10 @@ public boolean exceedMaximumPacketSize(int readableBytes) { return getMaximumPacketSize() != 0 ? readableBytes > maximumPacketSize : false; } + public int getTopicAliasMaximum() { + return Optional.ofNullable(topicAliasMaximum).orElse(0); + } + public void updateExpireInterval(int newExpireInterval) throws InvalidSessionExpireIntervalException { if (sessionExpireInterval <= SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime() || newExpireInterval < SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime()) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ReasonCodeOnAllACKTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ReasonCodeOnAllACKTest.java index 86ed84b26..170d133aa 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ReasonCodeOnAllACKTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ReasonCodeOnAllACKTest.java @@ -199,4 +199,16 @@ public void testDisConnectSuccess() { Assert.assertEquals(reasonCode, Mqtt5ConnAckReasonCode.SUCCESS); client.disconnect(); } + + @Test(timeOut = TIMEOUT) + public void testTopicAliasMaximum() { + final int aliasMaximum = 2000; + Mqtt5BlockingClient mqtt5Client = MQTT5ClientUtils.createMqtt5Client(getMqttBrokerPortList().get(0)); + Mqtt5ConnAck ack = mqtt5Client.connectWith() + .restrictions() + .topicAliasMaximum(aliasMaximum) + .applyRestrictions() + .send(); + Assert.assertEquals(ack.getRestrictions().getTopicAliasMaximum(), 2000); + } }