This repository was archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 142
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Kafka client can connect to a non-existing listener in advertisedListeners #751
Copy link
Copy link
Closed
Labels
Description
Describe the bug
#742 supports multiple listeners based on configs like
kafkaListeners=kafka_internal://localhost:9092,kafka_external://localhost:19092
kafkaProtocolMap=kafka_internal:PLAINTEXT,kafka_external:PLAINTEXT
advertisedListeners=pulsar:pulsar://localhost:6650,kafka_internal:pulsar://localhost:9092,kafka_external:pulsar://localhost:19092However, it looks like even a listener name doesn't appear in advertisedListeners, the Kafka client could still connect to the associated end point.
To Reproduce
Add a test to KafkaListenerNameTest:
@Test(timeOut = 20000)
public void testConnectListenerNotExist() throws Exception {
final int externalPort = PortManager.nextFreePort();
super.resetConfig();
conf.setAdvertisedAddress(null);
conf.setKafkaListeners("kafka://0.0.0.0:" + kafkaBrokerPort + ",kafka_external://0.0.0.0:" + externalPort);
conf.setKafkaProtocolMap("kafka:PLAINTEXT,kafka_external:PLAINTEXT");
conf.setAdvertisedListeners("pulsar:pulsar://localhost:" + brokerPort
+ ",kafka_external:pulsar://0.0.0.0:" + externalPort);
super.internalSetup();
final Properties props = newKafkaProducerProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort);
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<>("my-topic", "hello")).get();
Assert.fail("send should fail");
} catch (InterruptedException | ExecutionException e) {
log.info("Send failed: {}", e.getMessage());
}
producer.close();
super.internalCleanup();
}Expected behavior
When a producer sends messages to a listener that doesn't appear in advertisedListeners, it should fail.
Additional context
After I debugged this issue, I found it's because when the first lookup request with listener name kafka failed, the response would be wrong. What's worse is that it returns the wrong broker cache in AdminManager to Kafka client, then Kafka client connected to the listener of kafka_external.