Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,43 @@
public class EndPoint {

private static final String END_POINT_SEPARATOR = ",";
private static final String PROTOCOL_MAP_SEPARATOR = ",";
private static final String PROTOCOL_SEPARATOR = ":";
private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)";
private static final Pattern PATTERN = Pattern.compile(REGEX);

@Getter
private final String originalListener;
@Getter
private final String listenerName;
@Getter
private final SecurityProtocol securityProtocol;
@Getter
private final String hostname;
@Getter
private final int port;
@Getter
private final boolean multiListener;

public EndPoint(final String listener) {
public EndPoint(final String listener, final Map<String, SecurityProtocol> protocolMap) {
this.originalListener = listener;
final String errorMessage = "listener '" + listener + "' is invalid";
final Matcher matcher = PATTERN.matcher(listener);
checkState(matcher.find(), errorMessage);
checkState(matcher.groupCount() == 3, errorMessage);

this.securityProtocol = SecurityProtocol.forName(matcher.group(1));
this.listenerName = matcher.group(1);
if (protocolMap == null || protocolMap.isEmpty()) {
multiListener = false;
this.securityProtocol = SecurityProtocol.forName(matcher.group(1));
} else {
multiListener = true;
this.securityProtocol = protocolMap.get(this.listenerName);
if (this.securityProtocol == null) {
throw new IllegalStateException(this.listenerName + " is not set in kafkaProtocolMap");
}
}

final String originalHostname = matcher.group(2);
if (originalHostname.isEmpty()) {
try {
Expand All @@ -65,19 +82,26 @@ public EndPoint(final String listener) {
checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid");
}



public InetSocketAddress getInetAddress() {
return new InetSocketAddress(hostname, port);
}

public static Map<SecurityProtocol, EndPoint> parseListeners(final String listeners) {
final Map<SecurityProtocol, EndPoint> endPointMap = new HashMap<>();
public static Map<String, EndPoint> parseListeners(final String listeners) {
return parseListeners(listeners, null);
}

public static Map<String, EndPoint> parseListeners(final String listeners, final String kafkaProtocolMap) {
final Map<String, EndPoint> endPointMap = new HashMap<>();
final Map<String, SecurityProtocol> protocolMap = parseProtocolMap(kafkaProtocolMap);
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
final EndPoint endPoint = new EndPoint(listener);
if (endPointMap.containsKey(endPoint.securityProtocol)) {
final EndPoint endPoint = new EndPoint(listener, protocolMap);
if (endPointMap.containsKey(endPoint.listenerName)) {
throw new IllegalStateException(
listeners + " has multiple listeners whose protocol is " + endPoint.securityProtocol);
listeners + " has multiple listeners whose listenerName is " + endPoint.listenerName);
} else {
endPointMap.put(endPoint.securityProtocol, endPoint);
endPointMap.put(endPoint.listenerName, endPoint);
}
}
return endPointMap;
Expand All @@ -87,7 +111,7 @@ public static EndPoint getPlainTextEndPoint(final String listeners) {
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
if (listener.startsWith(SecurityProtocol.PLAINTEXT.name())
|| listener.startsWith(SecurityProtocol.SASL_PLAINTEXT.name())) {
return new EndPoint(listener);
return new EndPoint(listener, null);
}
}
throw new IllegalStateException(listeners + " has no plain text endpoint");
Expand All @@ -97,9 +121,31 @@ public static EndPoint getSslEndPoint(final String listeners) {
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
if (listener.startsWith(SecurityProtocol.SSL.name())
|| listener.startsWith(SecurityProtocol.SASL_SSL.name())) {
return new EndPoint(listener);
return new EndPoint(listener, null);
}
}
throw new IllegalStateException(listeners + " has no ssl endpoint");
}

public static Map<String, SecurityProtocol> parseProtocolMap(final String kafkaProtocolMap) {

final Map<String, SecurityProtocol> protocolMap = new HashMap<>();
if (kafkaProtocolMap == null) {
return protocolMap;
}

for (String protocolSet : kafkaProtocolMap.split(PROTOCOL_MAP_SEPARATOR)) {
String[] protocol = protocolSet.split(PROTOCOL_SEPARATOR);
if (protocol.length != 2) {
throw new IllegalStateException(
"wrong format for kafkaProtocolMap " + kafkaProtocolMap);
}
if (protocolMap.containsKey(protocol[0])) {
throw new IllegalStateException(
kafkaProtocolMap + " has multiple listeners whose listenerName is " + protocol[0]);
}
protocolMap.put(protocol[0], SecurityProtocol.forName(protocol[1]));
}
return protocolMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -312,7 +311,8 @@ public String getProtocolDataToAdvertise() {
public void start(BrokerService service) {
brokerService = service;
kopBrokerLookupManager = new KopBrokerLookupManager(
brokerService.getPulsar(), false, kafkaConfig.getKafkaAdvertisedListeners());
brokerService.getPulsar(), false,
kafkaConfig.getKafkaAdvertisedListeners());

log.info("Starting KafkaProtocolHandler, kop version is: '{}'", KopVersion.getVersion());
log.info("Git Revision {}", KopVersion.getGitSha());
Expand Down Expand Up @@ -440,29 +440,24 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

try {
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();

final Map<SecurityProtocol, EndPoint> advertisedEndpointMap =
EndPoint.parseListeners(kafkaConfig.getKafkaAdvertisedListeners());
EndPoint.parseListeners(kafkaConfig.getListeners()).forEach((protocol, endPoint) -> {
EndPoint advertisedEndPoint = advertisedEndpointMap.get(protocol);
if (advertisedEndPoint == null) {
// Use the bind endpoint as the advertised endpoint.
advertisedEndPoint = endPoint;
}
switch (protocol) {
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();

EndPoint.parseListeners(kafkaConfig.getListeners(), kafkaConfig.getKafkaProtocolMap()).
forEach((listener, endPoint) -> {
switch (endPoint.getSecurityProtocol()) {
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, this, adminManager, false,
advertisedEndPoint, scopeStatsLogger, localBrokerDataCache));
endPoint, scopeStatsLogger, localBrokerDataCache));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, this, adminManager, true,
advertisedEndPoint, scopeStatsLogger, localBrokerDataCache));
endPoint, scopeStatsLogger, localBrokerDataCache));
break;
default:
}
});
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -179,7 +178,6 @@
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.eclipse.jetty.util.StringUtil;

/**
* This class contains all the request handling methods.
Expand Down Expand Up @@ -2188,11 +2186,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
return future;
}

// if kafkaListenerName is set, the lookup result is the advertised address
if (!StringUtil.isBlank(kafkaConfig.getKafkaListenerName())) {
// TODO: should add SecurityProtocol according to which endpoint is handling the request.
// firstly we only support PLAINTEXT when lookup with kafkaListenerName
String kafkaAdvertisedAddress = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(),
if (advertisedEndPoint.isMultiListener()) {
// if kafkaProtocolMap is set, the lookup result is the advertised address
String kafkaAdvertisedAddress = String.format("%s://%s:%s", advertisedEndPoint.getSecurityProtocol().name,
pulsarAddress.getHostName(), pulsarAddress.getPort());
KafkaTopicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
returnFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress));
Expand Down Expand Up @@ -2285,7 +2281,8 @@ public CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
}
CompletableFuture<PartitionMetadata> returnFuture = new CompletableFuture<>();

topicManager.getTopicBroker(topic.toString())
topicManager.getTopicBroker(topic.toString(),
advertisedEndPoint.isMultiListener() ? advertisedEndPoint.getListenerName() : null)
.thenApply(address -> getProtocolDataToAdvertise(address, topic))
.thenAccept(kopAddressFuture -> kopAddressFuture.thenAccept(listenersOptional -> {
if (!listenersOptional.isPresent()) {
Expand All @@ -2298,8 +2295,8 @@ public CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
// It's the `kafkaAdvertisedListeners` config that's written to ZK
final String listeners = listenersOptional.get();
final EndPoint endPoint =
(tlsEnabled ? EndPoint.getSslEndPoint(listeners)
: EndPoint.getPlainTextEndPoint(listeners));
(tlsEnabled ? EndPoint.getSslEndPoint(listeners) :
EndPoint.getPlainTextEndPoint(listeners));
final Node node = newNode(endPoint.getInetAddress());

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,23 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
private String kafkaListeners;

@FieldContext(
category = CATEGORY_KOP,
doc = "Listeners to publish to ZooKeeper for clients to use.\n"
+ "The format is the same as `kafkaListeners`.\n"
category = CATEGORY_KOP,
doc = "Comma-separated map of listener name and protocol.\n"
+ "e.g. PRIVATE:PLAINTEXT,PRIVATE_SSL:SSL,PUBLIC:PLAINTEXT,PUBLIC_SSL:SSL.\n"
)
private String kafkaProtocolMap;

@Deprecated
@FieldContext(
category = CATEGORY_KOP,
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
)
private String kafkaAdvertisedListeners;

@Deprecated
@FieldContext(
category = CATEGORY_KOP,
doc = "Specify the internal listener name for the broker.\n"
+ "The listener name must be contained in the advertisedListeners.\n"
+ "This config is used as the listener name in topic lookup."
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
)
private String kafkaListenerName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class KafkaTopicManager {

private final AtomicBoolean closed = new AtomicBoolean(false);

public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
public static final ConcurrentHashMap<String, ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>>
LOOKUP_CACHE = new ConcurrentHashMap<>();

public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
Expand Down Expand Up @@ -178,32 +178,44 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
// call pulsarclient.lookup.getbroker to get and
// own a topic.
// // when error happens, the returned future will complete with null.
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName, String listenerName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopicBroker({}) since channel closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(null);
}
return LOOKUP_CACHE.computeIfAbsent(topicName, t -> {

ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> topicLookupCache =
LOOKUP_CACHE.computeIfAbsent(topicName, t-> {
if (log.isDebugEnabled()) {
log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker",
requestHandler.ctx.channel(), topicName);
}
ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> cache = new ConcurrentHashMap<>();
cache.put(listenerName == null ? "" : listenerName, lookupBroker(topicName, listenerName));
return cache;
});

return topicLookupCache.computeIfAbsent(listenerName == null ? "" : listenerName, t-> {
if (log.isDebugEnabled()) {
log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker",
requestHandler.ctx.channel(), topicName);
requestHandler.ctx.channel(), topicName);
}
return lookupBroker(topicName);
return lookupBroker(topicName, listenerName);
});
}

private CompletableFuture<InetSocketAddress> lookupBroker(final String topic) {
private CompletableFuture<InetSocketAddress> lookupBroker(final String topic, String listenerName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel closing",
requestHandler.ctx.channel(), topic);
}
return CompletableFuture.completedFuture(null);
}
return lookupClient.getBrokerAddress(TopicName.get(topic));
return lookupClient.getBrokerAddress(TopicName.get(topic), listenerName);
}

// A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class KopBrokerLookupManager {
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled, String advertisedListeners) {
public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled,
String advertisedListeners) {
this.pulsarService = pulsarService;
this.tlsEnabled = tlsEnabled;
this.advertisedListeners = advertisedListeners;
Expand Down Expand Up @@ -82,8 +83,8 @@ public CompletableFuture<InetSocketAddress> findBroker(String topic) {

// It's the `kafkaAdvertisedListeners` config that's written to ZK
final EndPoint endPoint =
tlsEnabled ? EndPoint.getSslEndPoint(listeners.get())
: EndPoint.getPlainTextEndPoint(listeners.get());
(tlsEnabled ? EndPoint.getSslEndPoint(listeners.get()) :
EndPoint.getPlainTextEndPoint(listeners.get()));

// here we found topic broker: broker2, but this is in broker1,
// how to clean the lookup cache?
Expand Down
Loading