Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,9 @@ public static Optional<Integer> getExpireInterval(MqttProperties properties) {
return Optional.ofNullable(property.value());
}

/**
* Get receive maximum.
* @param properties - mqtt properties
* @return Integer - expire interval value
*/
@SuppressWarnings("unchecked")
private static Optional<Integer> getReceiveMaximum(MqttProperties properties) {
MqttProperties.MqttProperty<Integer> property = properties
.getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value());
if (property == null) {
return Optional.empty();
}
return Optional.ofNullable(property.value());
}

public static Optional<Integer> getMaximumPacketSize(MqttProperties properties) {
MqttProperties.MqttProperty<Integer> property = properties
.getProperty(MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE.value());
public static Optional<Integer> getIntProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) {
MqttProperties.MqttProperty<Integer> property = properties.getProperty(type.value());
if (property == null) {
return Optional.empty();
}
Expand All @@ -81,15 +66,21 @@ public static void parsePropertiesToStuffRestriction(
getExpireInterval(properties)
.ifPresent(clientRestrictionsBuilder::sessionExpireInterval);
// parse receive maximum
Optional<Integer> receiveMaximum = getReceiveMaximum(properties);
Optional<Integer> receiveMaximum = getIntProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM);
if (receiveMaximum.isPresent() && receiveMaximum.get() == 0) {
throw new InvalidReceiveMaximumException("Not Allow Receive maximum property value zero");
} else {
receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum);
}
// parse maximum packet size
Optional<Integer> maximumPacketSize = getMaximumPacketSize(properties);
Optional<Integer> maximumPacketSize = getIntProperty(properties, MqttProperties
.MqttPropertyType.MAXIMUM_PACKET_SIZE);
maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize);
// parse request problem information
Optional<Integer> requestProblemInformation =
getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION);
// the empty option means allowing reason string or user property
clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ private MqttProperties getProperties() {
return MqttProperties.NO_PROPERTIES;
}
MqttProperties properties = new MqttProperties();
MqttPropertyUtils.setReasonString(properties, reasonString);
if (reasonString != null) {
MqttPropertyUtils.setReasonString(properties, reasonString);
}
return properties;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ public void processPublish(MqttAdapterMessage adapter) {
Throwable cause = ex.getCause();
log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}",
msg.variableHeader().topicName(), connection.getClientId(), cause);
MqttAck pubAck = MqttPubAck.errorBuilder(connection.getProtocolVersion())
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder =
MqttPubAck.errorBuilder(connection.getProtocolVersion())
.packetId(packetId)
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR)
.reasonString(String.format("Failed to publish for topic, because of look up error %s",
cause.getMessage()))
.build();
connection.sendAckThenClose(pubAck)
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString(
String.format("Failed to publish for topic, because of look up error %s",
cause.getMessage()));
}
connection.sendAckThenClose(pubAckBuilder.build())
.thenAccept(__ -> connection.decrementServerReceivePubMessage());
}
});
Expand Down Expand Up @@ -273,12 +276,14 @@ public void processSubscribe(final MqttAdapterMessage adapter) {
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[Proxy Subscribe] Failed to process subscribe for {}", clientId, realCause);
MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion())
MqttSubAck.MqttSubErrorAckBuilder subAckBuilder =
MqttSubAck.errorBuilder(connection.getProtocolVersion())
.packetId(packetId)
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR)
.reasonString("[ MOP ERROR ]" + realCause.getMessage())
.build();
connection.sendAckThenClose(subAck);
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
subAckBuilder.reasonString("[ MOP ERROR ]" + realCause.getMessage());
}
connection.sendAckThenClose(subAckBuilder.build());
subscribeAckTracker.remove(packetId);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ClientRestrictions {
private boolean cleanSession;
private Integer maximumPacketSize;

@Getter
private boolean allowReasonStrOrUserProperty;

public int getSessionExpireInterval() {
return Optional.ofNullable(sessionExpireInterval)
.orElse(SessionExpireInterval.EXPIRE_IMMEDIATELY.getSecondTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,14 @@ private CompletableFuture<Void> doUnauthorized(MqttAdapterMessage adapter) {
log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}",
msg.variableHeader().topicName(), connection.getUserRole(),
connection.getClientId());
MqttAck pubAck = MqttPubAck
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck
.errorBuilder(connection.getProtocolVersion())
.packetId(msg.variableHeader().packetId())
.reasonCode(Mqtt5PubReasonCode.NOT_AUTHORIZED)
.reasonString("Not Authorized!")
.build();
connection.sendAckThenClose(pubAck);
.reasonCode(Mqtt5PubReasonCode.NOT_AUTHORIZED);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString("Not Authorized!");
}
connection.sendAckThenClose(pubAckBuilder.build());
return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -242,13 +243,14 @@ private void checkServerReceivePubMessageAndIncrementCounterIfNeeded(MqttAdapter
if (connection.getServerReceivePubMessage() >= connection.getClientRestrictions().getReceiveMaximum()) {
log.warn("Client publish exceed server receive maximum , the receive maximum is {}",
connection.getServerRestrictions().getReceiveMaximum());
MqttAck pubAck = MqttPubAck.errorBuilder(connection.getProtocolVersion())
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(connection.getProtocolVersion())
.reasonCode(Mqtt5PubReasonCode.QUOTA_EXCEEDED)
.packetId(msg.variableHeader().packetId())
.reasonString(String.format("Publish exceed server receive maximum %s.",
connection.getServerRestrictions().getReceiveMaximum()))
.build();
connection.sendAckThenClose(pubAck);
.packetId(msg.variableHeader().packetId());
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString(String.format("Publish exceed server receive maximum %s.",
connection.getServerRestrictions().getReceiveMaximum()));
}
connection.sendAckThenClose(pubAckBuilder.build());
} else {
connection.incrementServerReceivePubMessage();
}
Expand Down Expand Up @@ -369,12 +371,13 @@ private CompletableFuture<Void> doSubscribe(MqttSubscribeMessage msg) {
final List<MqttTopicSubscription> subTopics = topicSubscriptions(msg);
boolean duplicated = mqttSubscriptionManager.addSubscriptions(connection.getClientId(), subTopics);
if (duplicated) {
MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion())
MqttSubAck.MqttSubErrorAckBuilder subAckBuilder = MqttSubAck.errorBuilder(connection.getProtocolVersion())
.packetId(messageID)
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR)
.reasonString("Duplicated subscribe")
.build();
connection.sendAckThenClose(subAck);
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
subAckBuilder.reasonString("Duplicated subscribe");
}
connection.sendAckThenClose(subAckBuilder.build());
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futureList = new ArrayList<>(subTopics.size());
Expand Down Expand Up @@ -425,20 +428,24 @@ private CompletableFuture<Void> doSubscribe(MqttSubscribeMessage msg) {
subTopics.stream().map(MqttTopicSubscription::topicName)
.collect(Collectors.joining(",")),
pulsarService.getConfig().isAllowAutoTopicCreation());
MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion())
MqttSubAck.MqttSubErrorAckBuilder subAckBuilder =
MqttSubAck.errorBuilder(connection.getProtocolVersion())
.packetId(messageID)
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR)
.reasonString("Topic not found")
.build();
connection.sendAckThenClose(subAck);
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
subAckBuilder.reasonString("Topic not found");
}
connection.sendAckThenClose(subAckBuilder.build());
} else {
log.error("[Subscribe] [{}] Failed to process MQTT subscribe.", connection.getClientId(), ex);
MqttAck subAck = MqttSubAck.errorBuilder(connection.getProtocolVersion())
MqttSubAck.MqttSubErrorAckBuilder subAckBuilder =
MqttSubAck.errorBuilder(connection.getProtocolVersion())
.packetId(messageID)
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR)
.reasonString("[ MOP ERROR ]" + realCause.getMessage())
.build();
connection.sendAckThenClose(subAck);
.errorReason(MqttSubAck.ErrorReason.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
subAckBuilder.reasonString("[ MOP ERROR ]" + realCause.getMessage());
}
connection.sendAckThenClose(subAckBuilder.build());
}
return null;
});
Expand Down Expand Up @@ -533,12 +540,14 @@ public void processUnSubscribe(MqttAdapterMessage adapter) {
.build();
connection.sendAck(unsubAck);
} else {
MqttAck unsubAck = MqttUnsubAck.errorBuilder(connection.getProtocolVersion())
MqttUnsubAck.MqttUnsubErrorAckBuilder unsubAckBuilder =
MqttUnsubAck.errorBuilder(connection.getProtocolVersion())
.packetId(packetId)
.reasonCode(Mqtt5UnsubReasonCode.UNSPECIFIED_ERROR)
.reasonString(cause.getMessage())
.build();
connection.sendAckThenClose(unsubAck);
.reasonCode(Mqtt5UnsubReasonCode.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
unsubAckBuilder.reasonString(cause.getMessage());
}
connection.sendAckThenClose(unsubAckBuilder.build());
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,22 @@ public CompletableFuture<Void> publish(MqttAdapterMessage adapter) {
} else if (realCause instanceof BrokerServiceException.TopicNotFoundException) {
log.warn("Topic [{}] Not found, the configuration [isAllowAutoTopicCreation={}]",
topic, pulsarService.getConfig().isAllowAutoTopicCreation());
MqttAck pubAck = MqttPubAck.errorBuilder(protocolVersion)
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
.packetId(packetId)
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR)
.reasonString("Topic not found")
.build();
connection.sendAckThenClose(pubAck);
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString("Topic not found");
}
connection.sendAckThenClose(pubAckBuilder.build());
} else {
log.error("[{}] Publish msg {} fail.", topic, msg, ex);
MqttAck pubAck = MqttPubAck.errorBuilder(protocolVersion)
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
.packetId(packetId)
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR)
.reasonString(realCause.getMessage())
.build();
connection.sendAckThenClose(pubAck);
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString(realCause.getMessage());
}
connection.sendAckThenClose(pubAckBuilder.build());
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectRestrictions;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
Expand Down Expand Up @@ -120,9 +121,41 @@ public void testPublishNotAuthorized() {
.qos(MqttQos.AT_LEAST_ONCE)
.send();
client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send();
Assert.fail();
} catch (Mqtt5PubAckException ex) {
Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode();
Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED);
Assert.assertTrue(ex.getMqttMessage().getReasonString().isPresent());
}
Awaitility.await()
.untilAsserted(() -> Assert.assertFalse(client.getState().isConnected()));
}

@Test(timeOut = TIMEOUT)
public void testPublishWithRequestProblemInformation() {
Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5Client(getMqttBrokerPortList().get(0));
client.connectWith()
.simpleAuth()
.username("user1")
.password("pass1".getBytes(StandardCharsets.UTF_8))
.applySimpleAuth()
.restrictions(Mqtt5ConnectRestrictions.builder()
.requestProblemInformation(false)
.build())
.send();
String message = "Hello MQTT Pulsar";
try {
client.publishWith()
.topic("a")
.payload(message.getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
client.subscribeWith().topicFilter("a").qos(MqttQos.AT_LEAST_ONCE).send();
Assert.fail();
} catch (Mqtt5PubAckException ex) {
Mqtt5PubAckReasonCode reasonCode = ex.getMqttMessage().getReasonCode();
Assert.assertEquals(reasonCode, Mqtt5PubAckReasonCode.NOT_AUTHORIZED);
Assert.assertFalse(ex.getMqttMessage().getReasonString().isPresent());
}
Awaitility.await()
.untilAsserted(() -> Assert.assertFalse(client.getState().isConnected()));
Expand Down
Loading