Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidSessionExpireIntervalException;
Expand Down Expand Up @@ -234,10 +236,16 @@ public void sendConnAck() {
sendAckThenClose(connAck);
return;
}
MqttAck connAck = MqttConnectAck.successBuilder(protocolVersion)
MqttConnectAck.MqttConnectSuccessAckBuilder builder = MqttConnectAck.successBuilder(protocolVersion)
.receiveMaximum(getServerRestrictions().getReceiveMaximum())
.cleanSession(clientRestrictions.isCleanSession())
.build();
.maximumQos(MqttQoS.AT_LEAST_ONCE.value());
MqttProperties.StringProperty resInformation = (MqttProperties.StringProperty) connectMessage.variableHeader()
.properties().getProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value());
if (resInformation != null) {
builder.responseInformation(resInformation.value());
}
MqttAck connAck = builder.build();
sendAck(connAck).thenAccept(__ -> assignState(CONNECT_ACK, ESTABLISHED));
if (log.isDebugEnabled()) {
log.debug("The CONNECT message has been processed. CId={}", clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt3.Mqtt3ConnReasonCode;
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5ConnReasonCode;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils;
import org.apache.commons.lang3.StringUtils;

/**
* Enhance mqtt connect ack message builder.
Expand All @@ -44,6 +45,10 @@ public final static class MqttConnectSuccessAckBuilder {
private boolean cleanSession;
private int receiveMaximum;

private int maximumQos;

private String responseInformation;

public MqttConnectSuccessAckBuilder(int protocolVersion) {
this.protocolVersion = protocolVersion;
}
Expand All @@ -58,6 +63,16 @@ public MqttConnectSuccessAckBuilder receiveMaximum(int receiveMaximum) {
return this;
}

public MqttConnectSuccessAckBuilder maximumQos(int maximumQos) {
this.maximumQos = maximumQos;
return this;
}

public MqttConnectSuccessAckBuilder responseInformation(String responseInformation) {
this.responseInformation = responseInformation;
return this;
}

public MqttAck build() {
MqttMessageBuilders.ConnAckBuilder commonBuilder = MqttMessageBuilders.connAck()
.sessionPresent(!cleanSession);
Expand All @@ -67,10 +82,20 @@ public MqttAck build() {
.build());
}
MqttProperties properties = new MqttProperties();
MqttProperties.IntegerProperty property =
MqttProperties.IntegerProperty receiveMaximumProperty =
new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(),
receiveMaximum);
properties.add(property);
MqttProperties.IntegerProperty maximumQosProperty =
new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.MAXIMUM_QOS.value(),
maximumQos);
properties.add(receiveMaximumProperty);
properties.add(maximumQosProperty);
if (StringUtils.isNotEmpty(responseInformation)) {
MqttProperties.StringProperty responseInformationProperty =
new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_INFORMATION.value(),
responseInformation);
properties.add(responseInformationProperty);
}
return MqttAck.createSupportedAck(
commonBuilder.returnCode(Mqtt5ConnReasonCode.SUCCESS.toConnectionReasonCode())
.properties(properties)
Expand Down Expand Up @@ -107,6 +132,12 @@ public MqttMessage authFail(int protocolVersion) {
return build().getMqttMessage();
}

public MqttMessage qosNotSupport(int protocolVersion) {
this.protocolVersion = protocolVersion;
this.errorReason = ErrorReason.QOS_NOT_SUPPORT;
return build().getMqttMessage();
}

public MqttMessage willQosNotSupport(int protocolVersion) {
this.protocolVersion = protocolVersion;
this.errorReason = ErrorReason.WILL_QOS_NOT_SUPPORT;
Expand Down Expand Up @@ -150,6 +181,9 @@ enum ErrorReason {
UNSUPPORTED_VERSION(Mqtt3ConnReasonCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
Mqtt5ConnReasonCode.UNSUPPORTED_PROTOCOL_VERSION
),
QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE,
Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED),

WILL_QOS_NOT_SUPPORT(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE,
Mqtt5ConnReasonCode.QOS_NOT_SUPPORTED),
SERVER_UNAVAILABLE(Mqtt3ConnReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void processConnect(MqttAdapterMessage adapter) {
}
return;
}
if (!MqttUtils.isQosSupported(msg)) {
if (!MqttUtils.isWillQosSupported(msg)) {
MqttMessage mqttMessage = MqttConnectAck.errorBuilder().willQosNotSupport(protocolVersion);
adapter.setMqttMessage(mqttMessage);
channel.writeAndFlush(adapter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole,
.clientRestrictions(clientRestrictions)
.serverRestrictions(serverRestrictions)
.channel(channel)
.connectMessage(msg)
.connectionManager(connectionManager)
.fromProxy(adapterMsg.fromProxy())
.processor(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,19 @@ public static boolean isNotMqtt3(int version) {
return !isMqtt3(version);
}
public static boolean isQosSupported(MqttConnectMessage msg) {
int willQos = msg.variableHeader().willQos();
MqttQoS mqttQoS = MqttQoS.valueOf(willQos);
return isQosSupported(msg.fixedHeader().qosLevel());
}

public static boolean isWillQosSupported(MqttConnectMessage msg) {
boolean willFlag = msg.variableHeader().isWillFlag();
if (willFlag) {
return isQosSupported(MqttQoS.valueOf(msg.variableHeader().willQos()));
} else {
return true;
}
}

private static boolean isQosSupported(MqttQoS mqttQoS) {
return mqttQoS == MqttQoS.AT_LEAST_ONCE || mqttQoS == MqttQoS.AT_MOST_ONCE;
}

Expand Down