Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
Expand Down Expand Up @@ -70,7 +70,7 @@ private void findBroker(TopicName topicName,
AtomicLong remainingTime,
CompletableFuture<InetSocketAddress> future) {
pulsarClient.getLookup().getBroker(topicName)
.thenCompose(lookupPair ->
.thenCompose(lookupResult ->
localBrokerDataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).thenCompose(brokers -> {
// Get all broker data by metadata
List<CompletableFuture<Optional<LocalBrokerData>>> brokerDataFutures =
Expand All @@ -85,7 +85,7 @@ private void findBroker(TopicName topicName,
Optional<LocalBrokerData> specificBrokerData =
brokerDataFutures.stream().map(CompletableFuture::join)
.filter(brokerData -> brokerData.isPresent()
&& isLookupMQTTBroker(lookupPair, brokerData.get()))
&& isLookupMQTTBroker(lookupResult, brokerData.get()))
.map(Optional::get)
.findAny();
if (!specificBrokerData.isPresent()) {
Expand All @@ -110,7 +110,7 @@ && isLookupMQTTBroker(lookupPair, brokerData.get()))
String port = splits[splits.length - 1];
int mqttBrokerPort = Integer.parseInt(port);
return CompletableFuture.completedFuture(new InetSocketAddress(
lookupPair.getLeft().getHostName(), mqttBrokerPort));
lookupResult.getLogicalAddress().getHostName(), mqttBrokerPort));
});
}))
.thenAccept(future::complete)
Expand Down Expand Up @@ -149,14 +149,16 @@ public CompletableFuture<InetSocketAddress> findBroker(TopicName topicName) {
return lookupResult;
}

private boolean isLookupMQTTBroker(Pair<InetSocketAddress, InetSocketAddress> pair,
private boolean isLookupMQTTBroker(LookupTopicResult result,
LocalBrokerData localBrokerData) {

String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String plain = String.format("pulsar://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
return localBrokerData.getProtocol(protocolHandlerName).isPresent()
&& (localBrokerData.getPulsarServiceUrl().equals(plain)
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package io.streamnative.pulsar.handlers.mqtt.support;

import io.netty.channel.ChannelHandlerContext;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;

Expand All @@ -42,7 +44,7 @@ protected void close() {
}

@Override
public void closeConsumer(Consumer consumer) {
public void closeConsumer(Consumer consumer, Optional<BrokerLookupData> assignedBrokerLookupData) {
safelyRemoveConsumer(consumer);
MQTTConsumer mqttConsumer = (MQTTConsumer) consumer;
mqttConsumer.getConnection().disconnect();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<mockito.version>2.22.0</mockito.version>
<testng.version>6.14.3</testng.version>
<awaitility.version>4.0.2</awaitility.version>
<pulsar.version>3.0.0.1-SNAPSHOT</pulsar.version>
<pulsar.version>3.2.0-SNAPSHOT</pulsar.version>
<mqtt.codec.version>4.1.94.Final</mqtt.codec.version>
<log4j2.version>2.18.0</log4j2.version>
<fusesource.client.version>1.16</fusesource.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.EOFException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -46,7 +45,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
Expand All @@ -56,6 +54,7 @@
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -285,15 +284,16 @@ public void testProxyProcessPingReq() {
Thread.sleep(4000); // Sleep 2 times of setKeepAlive.
Assert.assertTrue(producer.isConnected());
// Check for broker
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> broker =
CompletableFuture<LookupTopicResult> broker =
((PulsarClientImpl) pulsarClient).getLookup().getBroker(TopicName.get(topic));
AtomicDouble active = new AtomicDouble(0);
AtomicDouble total = new AtomicDouble(0);
CompletableFuture<Void> result = new CompletableFuture<>();
broker.thenAccept(pair -> {
try {
HttpClient httpClient = HttpClientBuilder.create().build();
final String mopEndPoint = "http://localhost:" + (pair.getLeft().getPort() + 2) + "/mop/stats";
final String mopEndPoint =
"http://localhost:" + (pair.getLogicalAddress().getPort() + 2) + "/mop/stats";
HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint));
InputStream inputStream = response.getEntity().getContent();
InputStreamReader isReader = new InputStreamReader(inputStream);
Expand Down