Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
package io.streamnative.pulsar.handlers.mqtt;

import static io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toPulsarMsg;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException;
import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils;
import io.streamnative.pulsar.handlers.mqtt.support.RetainedMessageHandler;
import io.streamnative.pulsar.handlers.mqtt.utils.MessagePublishContext;
import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -43,14 +48,15 @@ protected AbstractQosPublishHandler(MQTTService mqttService) {
this.configuration = mqttService.getServerConfiguration();
}

protected CompletableFuture<Optional<Topic>> getTopicReference(MqttPublishMessage msg) {
return PulsarTopicUtils.getTopicReference(pulsarService, msg.variableHeader().topicName(),
protected CompletableFuture<Optional<Topic>> getTopicReference(String mqttTopicName) {
return PulsarTopicUtils.getTopicReference(pulsarService, mqttTopicName,
configuration.getDefaultTenant(), configuration.getDefaultNamespace(), true
, configuration.getDefaultTopicDomain());
}

protected CompletableFuture<PositionImpl> writeToPulsarTopic(MqttPublishMessage msg) {
return writeToPulsarTopic(msg, false);
protected CompletableFuture<PositionImpl> writeToPulsarTopic(TopicAliasManager topicAliasManager,
MqttPublishMessage msg) {
return writeToPulsarTopic(topicAliasManager, msg, false);
}

/**
Expand All @@ -59,19 +65,42 @@ protected CompletableFuture<PositionImpl> writeToPulsarTopic(MqttPublishMessage
* @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException}
* if the subscription does not exist;
*/
protected CompletableFuture<PositionImpl> writeToPulsarTopic(MqttPublishMessage msg,
boolean checkSubscription) {
return getTopicReference(msg).thenCompose(topicOp -> topicOp.map(topic -> {
MessageImpl<byte[]> message = toPulsarMsg(topic, msg);
protected CompletableFuture<PositionImpl> writeToPulsarTopic(TopicAliasManager topicAliasManager,
MqttPublishMessage msg, boolean checkSubscription) {
Optional<Integer> topicAlias = MqttPropertyUtils.getProperty(msg.variableHeader().properties(),
MqttProperties.MqttPropertyType.TOPIC_ALIAS);
String mqttTopicName;
if (topicAlias.isPresent()) {
int alias = topicAlias.get();
String tpName = msg.variableHeader().topicName();
if (StringUtils.isNoneBlank(tpName)) {
// update alias
boolean updateSuccess = topicAliasManager.updateTopicAlias(tpName, alias);
if (!updateSuccess) {
throw new MQTTTopicAliasExceedsLimitException(alias,
topicAliasManager.getTopicMaximumAlias());
}
}
Optional<String> realName = topicAliasManager.getTopicByAlias(alias);
if (!realName.isPresent()) {
throw new MQTTTopicAliasNotFoundException(alias);
}
mqttTopicName = realName.get();
} else {
mqttTopicName = msg.variableHeader().topicName();
}
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
MessageImpl<byte[]> message = toPulsarMsg(topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
CompletableFuture<PositionImpl> ret = MessagePublishContext.publishMessages(message, topic);
message.recycle();
return ret.thenApply(position -> {
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
throw new MQTTNoMatchingSubscriberException(msg.variableHeader().topicName());
throw new MQTTNoMatchingSubscriberException(mqttTopicName);
}
return position;
});
}).orElseGet(() -> FutureUtil.failedFuture(
new BrokerServiceException.TopicNotFoundException(msg.variableHeader().topicName()))));
new BrokerServiceException.TopicNotFoundException(mqttTopicName))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class Connection {
private volatile int serverCurrentReceiveCounter = 0;
@Getter
private final ProtocolMethodProcessor processor;

@Getter
private final TopicAliasManager topicAliasManager;

@Getter
private final boolean fromProxy;
private volatile ConnectionState connectionState = DISCONNECTED;
Expand Down Expand Up @@ -103,6 +107,7 @@ public class Connection {
this.processor = builder.processor;
this.fromProxy = builder.fromProxy;
this.manager.addConnection(this);
this.topicAliasManager = new TopicAliasManager(clientRestrictions.getTopicAliasMaximum());
}

private void addIdleStateHandler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;


public class TopicAliasManager {

public static final int NO_ALIAS = -1;
private final Map<Integer, String> clientSideAlias;
private final Map<String, TopicAliasInfo> brokerSideAlias;

private final PacketIdGenerator idGenerator;
@Getter
private final int topicMaximumAlias;

public TopicAliasManager(int topicMaximumAlias) {
this.topicMaximumAlias = topicMaximumAlias;
this.idGenerator = PacketIdGenerator.newNonZeroGenerator();
this.clientSideAlias = new ConcurrentHashMap<>(topicMaximumAlias);
this.brokerSideAlias = new ConcurrentHashMap<>(topicMaximumAlias);
}

public Optional<String> getTopicByAlias(int alias) {
return Optional.ofNullable(clientSideAlias.get(alias));
}

public boolean updateTopicAlias(String topic, int alias) {
return clientSideAlias.compute(alias, (k, v) -> {
if (v == null && clientSideAlias.size() >= topicMaximumAlias) {
return null;
}
return topic;
}) != null;
}

public Optional<Integer> getOrCreateAlias(String topic) {
TopicAliasInfo topicAliasInfo = brokerSideAlias.computeIfAbsent(topic, (k) -> {
if (brokerSideAlias.size() >= topicMaximumAlias) {
return null;
}
return TopicAliasInfo.builder()
.alias(idGenerator.nextPacketId())
.syncToClient(false)
.build();
});
if (topicAliasInfo == null) {
// Exceeds topic alias maximum
return Optional.empty();
}
return Optional.of(topicAliasInfo.getAlias());
}

public boolean isSyncAliasToClient(String topicName) {
TopicAliasInfo topicAliasInfo = brokerSideAlias.get(topicName);
if (topicAliasInfo == null) {
return false;
}
return topicAliasInfo.isSyncToClient();
}

public void syncAlias(String topic) {
brokerSideAlias.computeIfPresent(topic, (key, info) -> {
info.setSyncToClient(true);
return info;
});
}

@Getter
@ToString
@EqualsAndHashCode
@Builder
static class TopicAliasInfo {
private int alias;
@Setter
private boolean syncToClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.exception;

import lombok.Getter;

public class MQTTTopicAliasExceedsLimitException extends RuntimeException {
@Getter
private final int alias;
@Getter
private final int topicAliasMaximum;

public MQTTTopicAliasExceedsLimitException(int alias, int topicAliasMaximum) {
super(String.format("The topic alias %s is exceed topic alias maximum %s.", alias, topicAliasMaximum));
this.alias = alias;
this.topicAliasMaximum = topicAliasMaximum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.exception;

import lombok.Getter;

public class MQTTTopicAliasNotFoundException extends RuntimeException {
@Getter
private final int alias;

public MQTTTopicAliasNotFoundException(int alias) {
super(String.format("The topic alias %s is not found.", alias));
this.alias = alias;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static Optional<Integer> getExpireInterval(MqttProperties properties) {
}

@SuppressWarnings("unchecked")
public static Optional<Integer> getIntProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) {
MqttProperties.MqttProperty<Integer> property = properties.getProperty(type.value());
public static <T> Optional<T> getProperty(MqttProperties properties, MqttProperties.MqttPropertyType type) {
MqttProperties.MqttProperty<T> property = properties.getProperty(type.value());
if (property == null) {
return Optional.empty();
}
Expand All @@ -66,23 +66,23 @@ public static void parsePropertiesToStuffRestriction(
getExpireInterval(properties)
.ifPresent(clientRestrictionsBuilder::sessionExpireInterval);
// parse receive maximum
Optional<Integer> receiveMaximum = getIntProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM);
Optional<Integer> receiveMaximum = getProperty(properties, MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM);
if (receiveMaximum.isPresent() && receiveMaximum.get() == 0) {
throw new InvalidReceiveMaximumException("Not Allow Receive maximum property value zero");
} else {
receiveMaximum.ifPresent(clientRestrictionsBuilder::receiveMaximum);
}
// parse maximum packet size
Optional<Integer> maximumPacketSize = getIntProperty(properties, MqttProperties
Optional<Integer> maximumPacketSize = getProperty(properties, MqttProperties
.MqttPropertyType.MAXIMUM_PACKET_SIZE);
maximumPacketSize.ifPresent(clientRestrictionsBuilder::maximumPacketSize);
// parse request problem information
Optional<Integer> requestProblemInformation =
getIntProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION);
getProperty(properties, MqttProperties.MqttPropertyType.REQUEST_PROBLEM_INFORMATION);
// the empty option means allowing reason string or user property
clientRestrictionsBuilder.allowReasonStrOrUserProperty(!requestProblemInformation.isPresent());
Optional<Integer> topicAliasMaximum =
getIntProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM);
getProperty(properties, MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM);
topicAliasMaximum.ifPresent(clientRestrictionsBuilder::topicAliasMaximum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public CompletableFuture<Void> publish(Connection connection, MqttAdapterMessage
if (MqttUtils.isRetainedMessage(msg)) {
return retainedMessageHandler.addRetainedMessage(msg);
}
return writeToPulsarTopic(msg).thenAccept(__ -> {});
return writeToPulsarTopic(connection.getTopicAliasManager(), msg).thenAccept(__ -> {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.streamnative.pulsar.handlers.mqtt.MQTTService;
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTNoMatchingSubscriberException;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasExceedsLimitException;
import io.streamnative.pulsar.handlers.mqtt.exception.MQTTTopicAliasNotFoundException;
import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttAck;
import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttPubAck;
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode;
Expand Down Expand Up @@ -48,7 +50,8 @@ public CompletableFuture<Void> publish(Connection connection, MqttAdapterMessage
if (MqttUtils.isRetainedMessage(msg)) {
ret = retainedMessageHandler.addRetainedMessage(msg);
} else {
ret = writeToPulsarTopic(msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null);
ret = writeToPulsarTopic(connection.getTopicAliasManager()
, msg, !MqttUtils.isMqtt3(protocolVersion)).thenApply(__ -> null);
}
// we need to check if subscription exist when protocol version is mqtt 5.x
return ret
Expand Down Expand Up @@ -87,6 +90,28 @@ public CompletableFuture<Void> publish(Connection connection, MqttAdapterMessage
pubAckBuilder.reasonString("Topic not found");
}
connection.sendAckThenClose(pubAckBuilder.build());
} else if (realCause instanceof MQTTTopicAliasNotFoundException) {
log.error("[{}] Publish message fail {}, because the topic alias {} not found.", topic, msg,
((MQTTTopicAliasNotFoundException) realCause).getAlias(), ex);
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
.packetId(packetId)
.reasonCode(Mqtt5PubReasonCode.TOPIC_NAME_INVALID);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString(ex.getMessage());
}
connection.sendAckThenClose(pubAckBuilder.build());
} else if (realCause instanceof MQTTTopicAliasExceedsLimitException) {
log.error("[{}] Publish message fail {},"
+ " because the topic alias {} is exceed topic alias maximum {}.", topic, msg,
((MQTTTopicAliasExceedsLimitException) realCause).getAlias(),
((MQTTTopicAliasExceedsLimitException) realCause).getTopicAliasMaximum(), ex);
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
.packetId(packetId)
.reasonCode(Mqtt5PubReasonCode.QUOTA_EXCEEDED);
if (connection.getClientRestrictions().isAllowReasonStrOrUserProperty()) {
pubAckBuilder.reasonString(ex.getMessage());
}
connection.sendAckThenClose(pubAckBuilder.build());
} else {
log.error("[{}] Publish msg {} fail.", topic, msg, ex);
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.streamnative.pulsar.handlers.mqtt.PacketIdGenerator;
import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -69,10 +70,9 @@ protected MessageMetadata initialValue() throws Exception {
};

// Convert MQTT message to Pulsar message.
public static MessageImpl<byte[]> toPulsarMsg(Topic topic, MqttPublishMessage mqttMsg) {
public static MessageImpl<byte[]> toPulsarMsg(Topic topic, MqttProperties properties, ByteBuffer payload) {
MessageMetadata metadata = LOCAL_MESSAGE_METADATA.get();
metadata.clear();
MqttProperties properties = mqttMsg.variableHeader().properties();
if (properties != null) {
properties.listAll().forEach(prop -> {
if (MqttProperties.MqttPropertyType.USER_PROPERTY.value() == prop.propertyId()) {
Expand Down Expand Up @@ -104,7 +104,7 @@ public static MessageImpl<byte[]> toPulsarMsg(Topic topic, MqttPublishMessage mq
}
});
}
return MessageImpl.create(metadata, mqttMsg.payload().nioBuffer(), SCHEMA, topic.getName());
return MessageImpl.create(metadata, payload, SCHEMA, topic.getName());
}

private static String getPropertiesPrefix(int propertyId) {
Expand Down
Loading