Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,35 @@ private static Map<String, EndPoint> parseListeners(final String listeners,
return endPointMap;
}

// listeners must be enable to be split into at least 1 token
private static String[] getListenerArray(final String listeners) {
if (StringUtils.isEmpty(listeners)) {
throw new IllegalStateException("listeners is empty");
}
final String[] listenerArray = listeners.split(END_POINT_SEPARATOR);
if (listenerArray.length == 0) {
throw new IllegalStateException(listeners + " is split into 0 tokens by " + END_POINT_SEPARATOR);
}
return listenerArray;
}

@VisibleForTesting
public static Map<String, EndPoint> parseListeners(final String listeners, final String protocolMapString) {
return parseListeners(listeners, parseProtocolMap(protocolMapString));
}

public static String findListener(final String listeners, final String name) {
if (name == null) {
return null;
}
for (String listener : getListenerArray(listeners)) {
if (listener.contains(":") && listener.substring(0, listener.indexOf(":")).equals(name)) {
return listener;
}
}
throw new IllegalStateException("listener \"" + name + "\" doesn't exist in " + listeners);
}

public static EndPoint getPlainTextEndPoint(final String listeners) {
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
if (listener.startsWith(SecurityProtocol.PLAINTEXT.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
Expand Down Expand Up @@ -197,6 +198,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final Boolean tlsEnabled;
private final EndPoint advertisedEndPoint;
private final String advertisedListeners;
private final Node selfNode;
private final boolean skipMessagesWithoutIndex;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
Expand Down Expand Up @@ -310,6 +312,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.tlsEnabled = tlsEnabled;
this.advertisedEndPoint = advertisedEndPoint;
this.advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners();
this.selfNode = newSelfNode();
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
this.topicManager = new KafkaTopicManager(this);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
Expand Down Expand Up @@ -767,7 +770,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
AtomicInteger topicsCompleted = new AtomicInteger(0);
// Each Pulsar broker can manage metadata like controller in Kafka, Kafka's AdminClient needs to find a
// controller node for metadata management. So here we return the broker itself as a controller.
final int controllerId = newSelfNode().id();
final int controllerId = selfNode.id();
pulsarTopicsFuture.whenComplete((pulsarTopics, e) -> {
if (e != null) {
log.warn("[{}] Request {}: Exception fetching metadata, will return null Response",
Expand Down Expand Up @@ -2546,8 +2549,15 @@ static Node newNode(InetSocketAddress address) {
address.getPort());
}

Node newSelfNode() {
return newNode(advertisedEndPoint.getInetAddress());
private Node newSelfNode() {
String advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners();
String listener = EndPoint.findListener(advertisedListeners, advertisedEndPoint.getListenerName());
if (listener == null) {
return newNode(advertisedEndPoint.getInetAddress());
}
final Matcher matcher = EndPoint.matcherListener(listener,
listener + " cannot be split into 3 parts");
return newNode(new InetSocketAddress(matcher.group(2), Integer.parseInt(matcher.group(3))));
}

static PartitionMetadata newPartitionMetadata(TopicName topicName, Node node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private String kafkaProtocolMap;

@Deprecated
@FieldContext(
category = CATEGORY_KOP,
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress if you want to use multiple listeners.\n"
+ "Otherwise, it should be the listeners published to ZooKeeper for clients to use.\n"
)
private String kafkaAdvertisedListeners;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) {
private KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) {
return buildRequest(builder, serviceAddress);
}

static KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder,
SocketAddress serviceAddress) {
AbstractRequest request = builder.build();
builder.apiKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import static org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -22,6 +36,11 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.testng.Assert;
Expand All @@ -44,6 +63,78 @@ protected void cleanup() throws Exception {
// Clean up in the test method
}

@Test(timeOut = 30000)
public void testMetadataRequestForMultiListeners() throws Exception {
final Map<Integer, InetSocketAddress> bindPortToAdvertisedAddress = new HashMap<>();
final int anotherKafkaPort = PortManager.nextFreePort();
bindPortToAdvertisedAddress.put(kafkaBrokerPort,
InetSocketAddress.createUnresolved("192.168.0.1", PortManager.nextFreePort()));
bindPortToAdvertisedAddress.put(anotherKafkaPort,
InetSocketAddress.createUnresolved("192.168.0.2", PortManager.nextFreePort()));

super.resetConfig();
conf.setKafkaListeners("PLAINTEXT://0.0.0.0:" + kafkaBrokerPort + ",GW://0.0.0.0:" + anotherKafkaPort);
conf.setKafkaProtocolMap("PLAINTEXT:PLAINTEXT,GW:PLAINTEXT");
conf.setKafkaAdvertisedListeners(String.format("PLAINTEXT://%s,GW://%s",
bindPortToAdvertisedAddress.get(kafkaBrokerPort),
bindPortToAdvertisedAddress.get(anotherKafkaPort)));
conf.setAdvertisedAddress(null);
conf.setAdvertisedListeners(String.format("pulsar:pulsar://192.168.0.3:%d,PLAINTEXT:pulsar://%s,GW:pulsar://%s",
brokerPort,
bindPortToAdvertisedAddress.get(kafkaBrokerPort),
bindPortToAdvertisedAddress.get(anotherKafkaPort)));
super.internalSetup();

final String topic = "persistent://public/default/test-metadata-request-for-multi-listeners";
final int numPartitions = 3;
admin.topics().createPartitionedTopic(topic, numPartitions);

final KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler)
pulsar.getProtocolHandlers().protocol(KafkaProtocolHandler.PROTOCOL_NAME);
protocolHandler.getChannelInitializerMap().forEach((inetSocketAddress, channelInitializer) -> {
try {
final KafkaRequestHandler requestHandler = ((KafkaChannelInitializer) channelInitializer).newCnx();
ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class);
doReturn(mock(Channel.class)).when(mockCtx).channel();
requestHandler.ctx = mockCtx;

final InetSocketAddress expectedAddress = bindPortToAdvertisedAddress.get(inetSocketAddress.getPort());

final KafkaHeaderAndRequest metadataRequest = KafkaApisTest.buildRequest(
new MetadataRequest.Builder(Collections.singletonList(topic), true),
inetSocketAddress);
final CompletableFuture<AbstractResponse> future = new CompletableFuture<>();
requestHandler.handleTopicMetadataRequest(metadataRequest, future);
final MetadataResponse metadataResponse = (MetadataResponse) future.get();
final List<Node> brokers = new ArrayList<>(metadataResponse.brokers());
Assert.assertEquals(brokers.get(0).host(), expectedAddress.getHostName());
Assert.assertEquals(brokers.get(0).port(), expectedAddress.getPort());

Node controller = metadataResponse.controller();
Assert.assertEquals(controller.host(), expectedAddress.getHostName());
Assert.assertEquals(controller.port(), expectedAddress.getPort());

final List<MetadataResponse.TopicMetadata> topicMetadataList =
new ArrayList<>(metadataResponse.topicMetadata());
Assert.assertEquals(topicMetadataList.size(), 1);
Assert.assertEquals(topicMetadataList.get(0).topic(), topic);

final List<PartitionMetadata> partitionMetadataList = topicMetadataList.get(0).partitionMetadata();
Assert.assertEquals(partitionMetadataList.size(), numPartitions);
for (int i = 0; i < numPartitions; i++) {
final PartitionMetadata partitionMetadata = partitionMetadataList.get(i);
Assert.assertEquals(partitionMetadata.error(), Errors.NONE);
Assert.assertEquals(partitionMetadata.leader().host(), expectedAddress.getHostName());
Assert.assertEquals(partitionMetadata.leader().port(), expectedAddress.getPort());
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});

super.internalCleanup();
}

@Test(timeOut = 30000)
public void testListenerName() throws Exception {
super.resetConfig();
Expand Down