Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey;
import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;

import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,7 +57,7 @@ class AdminManager {

private final PulsarAdmin admin;
private final int defaultNumPartitions;
private volatile Set<Node> brokersCache = new HashSet<>();
private volatile Map<String, Set<Node>> brokersCache = Maps.newHashMap();
private final ReentrantReadWriteLock brokersCacheLock = new ReentrantReadWriteLock();


Expand Down Expand Up @@ -227,11 +227,20 @@ public void deleteTopic(String topicToDelete,
}
}

public Collection<? extends Node> getBrokers() {
public Collection<? extends Node> getBrokers(String listenerName) {

if (brokersCache.containsKey(listenerName)) {
return brokersCache.get(listenerName);
}

return Collections.emptyList();
}

public Map<String, Set<Node>> getAllBrokers() {
return brokersCache;
}

public void setBrokers(Set<Node> newBrokers) {
public void setBrokers(Map<String, Set<Node>> newBrokers) {
brokersCacheLock.writeLock().lock();
try {
this.brokersCache = newBrokers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public class EndPoint {

private static final String END_POINT_SEPARATOR = ",";
public static final String END_POINT_SEPARATOR = ",";
private static final String PROTOCOL_MAP_SEPARATOR = ",";
private static final String PROTOCOL_SEPARATOR = ":";
private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)";
Expand All @@ -52,9 +52,7 @@ public class EndPoint {
public EndPoint(final String listener, final Map<String, SecurityProtocol> protocolMap) {
this.originalListener = listener;
final String errorMessage = "listener '" + listener + "' is invalid";
final Matcher matcher = PATTERN.matcher(listener);
checkState(matcher.find(), errorMessage);
checkState(matcher.groupCount() == 3, errorMessage);
final Matcher matcher = matcherListener(listener, errorMessage);

this.listenerName = matcher.group(1);
if (protocolMap == null || protocolMap.isEmpty()) {
Expand Down Expand Up @@ -82,8 +80,6 @@ public EndPoint(final String listener, final Map<String, SecurityProtocol> proto
checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid");
}



public InetSocketAddress getInetAddress() {
return new InetSocketAddress(hostname, port);
}
Expand Down Expand Up @@ -148,4 +144,12 @@ public static Map<String, SecurityProtocol> parseProtocolMap(final String kafkaP
}
return protocolMap;
}

public static Matcher matcherListener(String listener, String errorMessage) {
final Matcher matcher = PATTERN.matcher(listener);
checkState(matcher.find(), errorMessage);
checkState(matcher.groupCount() == 3, errorMessage);
return matcher;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
@Getter
protected final KafkaServiceConfiguration kafkaConfig;

public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) {
public KafkaCommandDecoder(StatsLogger statsLogger,
KafkaServiceConfiguration kafkaConfig) {
this.requestStats = new RequestStats(statsLogger);
this.kafkaConfig = kafkaConfig;
this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
List<TopicMetadata> allTopicMetadata = Collections.synchronizedList(Lists.newArrayList());
List<Node> allNodes = Collections.synchronizedList(Lists.newArrayList());
// Get all kop brokers in local cache
allNodes.addAll(adminManager.getBrokers());
allNodes.addAll(adminManager.getBrokers(advertisedEndPoint.getListenerName()));

List<String> topics = metadataRequest.topics();
// topics in format : persistent://%s/%s/abc-partition-x, will be grouped by as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static com.google.common.base.Preconditions.checkState;

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import java.io.FileInputStream;
Expand All @@ -28,7 +26,6 @@
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
Expand Down Expand Up @@ -64,9 +61,6 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
@Category
private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction";

private static final String END_POINT_SEPARATOR = ",";
private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)";
private static final Pattern PATTERN = Pattern.compile(REGEX);
//
// --- Kafka on Pulsar Broker configuration ---
//
Expand Down Expand Up @@ -397,11 +391,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {

private String checkAdvertisedListeners(String advertisedListeners) {
StringBuilder listenersReBuilder = new StringBuilder();
for (String listener : advertisedListeners.split(END_POINT_SEPARATOR)) {
for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) {
final String errorMessage = "listener '" + listener + "' is invalid";
final Matcher matcher = PATTERN.matcher(listener);
checkState(matcher.find(), errorMessage);
checkState(matcher.groupCount() == 3, errorMessage);
final Matcher matcher = EndPoint.matcherListener(listener, errorMessage);
String hostname = matcher.group(2);
if (hostname.isEmpty()) {
try {
Expand All @@ -417,9 +409,10 @@ private String checkAdvertisedListeners(String advertisedListeners) {
} else {
listenersReBuilder.append(listener);
}
listenersReBuilder.append(END_POINT_SEPARATOR);
listenersReBuilder.append(EndPoint.END_POINT_SEPARATOR);
}
return listenersReBuilder.deleteCharAt(listenersReBuilder.lastIndexOf(END_POINT_SEPARATOR)).toString();
return listenersReBuilder.deleteCharAt(
listenersReBuilder.lastIndexOf(EndPoint.END_POINT_SEPARATOR)).toString();
}

public @NonNull String getKafkaAdvertisedListeners() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
Expand All @@ -26,19 +25,20 @@
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -52,9 +52,6 @@

@Slf4j
public class KopEventManager {
private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)";
private static final Pattern PATTERN = Pattern.compile(REGEX);

private static final String kopEventThreadName = "kop-event-thread";
private final ReentrantLock putLock = new ReentrantLock();
private static final LinkedBlockingQueue<KopEventWrapper> queue =
Expand Down Expand Up @@ -186,7 +183,7 @@ private void getBrokers(List<String> pulsarBrokers,
BiConsumer<String, Long> registerEventLatency,
String name,
long startProcessTime) {
final Set<Node> kopBrokers = Sets.newConcurrentHashSet();
ConcurrentMap<String, Set<Node>> kopBrokersMap = Maps.newConcurrentMap();
final AtomicInteger pendingBrokers = new AtomicInteger(pulsarBrokers.size());

pulsarBrokers.forEach(broker -> {
Expand All @@ -207,9 +204,14 @@ private void getBrokers(List<String> pulsarBrokers,
JsonElement element = protocols.get("kafka");

if (element != null) {
String kopBrokerStr = element.getAsString();
Node kopNode = getNode(kopBrokerStr);
kopBrokers.add(kopNode);
String kopBrokerStrs = element.getAsString();
Map<String, Set<Node>> kopNodesMap = getNodes(kopBrokerStrs);
kopNodesMap.forEach((listenerName, nodesSet) -> {
Set<Node> currentNodeSet = kopBrokersMap.computeIfAbsent(listenerName,
s -> Sets.newConcurrentHashSet());
currentNodeSet.addAll(nodesSet);
kopBrokersMap.put(listenerName, currentNodeSet);
});
} else {
if (log.isDebugEnabled()) {
log.debug("Get broker {} path currently not a kop broker, skip it.", broker);
Expand All @@ -222,13 +224,13 @@ private void getBrokers(List<String> pulsarBrokers,
}

if (pendingBrokers.decrementAndGet() == 0) {
Collection<? extends Node> oldKopBrokers = adminManager.getBrokers();
adminManager.setBrokers(kopBrokers);
Map<String, Set<Node>> oldKopBrokers = adminManager.getAllBrokers();
adminManager.setBrokers(kopBrokersMap);
if (registerEventLatency != null) {
registerEventLatency.accept(name, startProcessTime);
}
log.info("Refresh kop brokers new cache {}, old brokers cache {}",
adminManager.getBrokers(), oldKopBrokers);
adminManager.getAllBrokers(), oldKopBrokers);
}
}
);
Expand All @@ -242,18 +244,24 @@ private JsonObject parseJsonObject(String info) {
}

@VisibleForTesting
public static Node getNode(String kopBrokerStr) {
final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid";
final Matcher matcher = PATTERN.matcher(kopBrokerStr);
checkState(matcher.find(), errorMessage);
checkState(matcher.groupCount() == 3, errorMessage);
String host = matcher.group(2);
String port = matcher.group(3);

return new Node(
Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)),
host,
Integer.parseInt(port));
public static Map<String, Set<Node>> getNodes(String kopBrokerStrs) {
HashMap<String, Set<Node>> nodesMap = Maps.newHashMap();
String[] kopBrokerArr = kopBrokerStrs.split(EndPoint.END_POINT_SEPARATOR);
for (String kopBrokerStr : kopBrokerArr) {
final String errorMessage = "kopBrokerStr " + kopBrokerStr + " is invalid";
final Matcher matcher = EndPoint.matcherListener(kopBrokerStr, errorMessage);
String listenerName = matcher.group(1);
String host = matcher.group(2);
String port = matcher.group(3);
Set<Node> nodeSet = nodesMap.computeIfAbsent(listenerName, s -> new HashSet<>());
nodeSet.add(new Node(
Murmur3_32Hash.getInstance().makeHash((host + port).getBytes(StandardCharsets.UTF_8)),
host,
Integer.parseInt(port)));
nodesMap.put(listenerName, nodeSet);
}

return nodesMap;
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,57 @@
*/
package io.streamnative.pulsar.handlers.kop;

import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Node;
import org.testng.Assert;
import org.testng.annotations.Test;

public class KopEventManagerTest {

@Test
public void testGetNode() {
public void testGetOneNode() {
final String host = "localhost";
final int port = 9120;
final String securityProtocol = "SASL_PLAINTEXT";
final String brokerStr = securityProtocol + "://" + host + ":" + port;
Node node = KopEventManager.getNode(brokerStr);
Assert.assertEquals(node.host(), host);
Assert.assertEquals(node.port(), port);
Map<String, Set<Node>> nodes = KopEventManager.getNodes(brokerStr);
Assert.assertEquals(1, nodes.size());
Assert.assertTrue(nodes.containsKey(securityProtocol));
Set<Node> nodeSet = nodes.get(securityProtocol);
Assert.assertEquals(1, nodeSet.size());
nodeSet.forEach(node -> {
Assert.assertEquals(node.host(), host);
Assert.assertEquals(node.port(), port);
});
}

@Test
public void testGetMultipleNodes() {
final String listenerName1 = "kafka_internal";
final String host1 = "localhost";
final int port1 = 9120;
final String listenerName2 = "kafka_external";
final String host2 = "localhost";
final int port2 = 9121;
final String brokersStr = listenerName1 + "://" + host1 + ":" + port1 + ","
+ listenerName2 + "://" + host2 + ":" + port2;
Map<String, Set<Node>> nodes = KopEventManager.getNodes(brokersStr);
Assert.assertEquals(2, nodes.size());
Assert.assertTrue(nodes.containsKey(listenerName1));
Set<Node> nodesSet1 = nodes.get(listenerName1);
Assert.assertEquals(1, nodesSet1.size());
nodesSet1.forEach(node -> {
Assert.assertEquals(node.host(), host1);
Assert.assertEquals(node.port(), port1);
});
Assert.assertTrue(nodes.containsKey(listenerName2));
Set<Node> nodesSet2 = nodes.get(listenerName2);
Assert.assertEquals(1, nodesSet2.size());
nodesSet2.forEach(node -> {
Assert.assertEquals(node.host(), host2);
Assert.assertEquals(node.port(), port2);
});
}

}
Loading