From e35346908e4d55a7862c906cae80ef24089168f6 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 26 Dec 2023 18:04:32 +0800 Subject: [PATCH] Compatible mater code --- .../pulsar/handlers/amqp/AmqpPulsarServerCnx.java | 4 +++- .../handlers/amqp/proxy/PulsarServiceLookupHandler.java | 8 ++++---- pom.xml | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java index ac1e5714..6f62951e 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java @@ -14,7 +14,9 @@ package io.streamnative.pulsar.handlers.amqp; import io.netty.channel.ChannelHandlerContext; +import java.util.Optional; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.ServerCnx; @@ -30,7 +32,7 @@ public AmqpPulsarServerCnx(PulsarService pulsar, ChannelHandlerContext ctx) { } @Override - public void closeConsumer(Consumer consumer) { + public void closeConsumer(Consumer consumer, Optional assignedBrokerLookupData) { // avoid close the connection when closing the consumer } } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java index a3bcd300..be35b5c2 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java @@ -16,7 +16,6 @@ import io.streamnative.pulsar.handlers.amqp.AmqpProtocolHandler; import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.List; @@ -27,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader; +import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; @@ -53,19 +53,19 @@ public CompletableFuture> findBroker(TopicName topicName, CompletableFuture> lookupResult = new CompletableFuture<>(); // lookup the broker for the given topic - CompletableFuture> lookup = + CompletableFuture lookup = pulsarClient.getLookup().getBroker(topicName); lookup.whenComplete((result, throwable) -> { if (throwable != null) { lookupResult.completeExceptionally(throwable); return; } - if (result == null || result.getLeft() == null) { + if (result == null || result.getLogicalAddress() == null) { lookupResult.completeExceptionally(new ProxyException( "Unable to resolve the broker for the topic: " + topicName)); return; } - String hostAndPort = result.getLeft().getHostName() + ":" + result.getLeft().getPort(); + String hostAndPort = result.getLogicalAddress().getHostName() + ":" + result.getLogicalAddress().getPort(); // fetch the protocol handler data List brokers = metadataStoreCacheLoader.getAvailableBrokers(); diff --git a/pom.xml b/pom.xml index 09d385c4..5cc53c06 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ ${maven.compiler.target} - 3.0.0.1-SNAPSHOT + 3.2.0-SNAPSHOT 8.0.0 5.8.0