From e271e8026f0719cfe2213612cde1da931b25c70a Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 10 May 2021 17:50:46 +0800 Subject: [PATCH 1/4] fix system topic produce protection bug and InnerTopicProtectionTest bug --- .../handlers/kop/KafkaRequestHandler.java | 17 +++++++++++++---- .../handlers/kop/InnerTopicProtectionTest.java | 7 ++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 8379ad62b6..405f5551f0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -82,6 +82,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -809,6 +810,8 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, e -> addPartitionResponse.accept(topicPartition, new PartitionResponse(Errors.forException(e))); final String fullPartitionName = KopTopic.toString(topicPartition); + + // check KOP inner topic if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) { log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", ctx.channel(), produceHar.getHeader(), topicPartition); @@ -845,9 +848,16 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, return; } - final Consumer persistentTopicConsumer = persistentTopic -> - publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName, - offsetConsumer, errorsConsumer); + final Consumer persistentTopicConsumer = persistentTopic -> { + // check system topic + if (persistentTopic.isSystemTopic()) { + log.error("Not support produce message to system topic. topic: {}", persistentTopic); + throw new InvalidTopicException(Errors.INVALID_TOPIC_EXCEPTION.message()); + } + publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName, + offsetConsumer, errorsConsumer); + }; + if (topicFuture.isDone()) { persistentTopicConsumer.accept(topicFuture.getNow(null)); } else { @@ -1832,7 +1842,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { .map(matchBroker -> metadataCache.get( String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker))) .collect(Collectors.toList()); - FutureUtil.waitForAll(list) .whenComplete((ignore, th) -> { if (th != null) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 054319d0fd..1e708825ec 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -17,6 +17,7 @@ import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; import java.util.Optional; import java.util.Properties; import lombok.Cleanup; @@ -64,6 +65,7 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setBrokerDeleteInactiveTopicsEnabled(false); kConfig.setSystemTopicEnabled(true); kConfig.setTopicLevelPoliciesEnabled(true); + kConfig.setDefaultNumPartitions(6); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); @@ -153,7 +155,7 @@ public void testInnerTopicProduce() throws PulsarAdminException { final String msg = "test-inner-topic-produce-and-consume"; assertProduceMessage(kafkaProducer, offsetTopic, msg, true); assertProduceMessage(kafkaProducer, transactionTopic, msg, true); - assertProduceMessage(kafkaProducer, systemTopic, msg, true); + //assertProduceMessage(kafkaProducer, systemTopic, msg, true); assertProduceMessage(kafkaProducer, commonTopic, msg, false); } @@ -161,6 +163,9 @@ private void assertProduceMessage(KafkaProducer producer, final String topic, fi boolean assertException) { try { producer.send(new ProducerRecord<>(topic, value)).get(); + if (assertException) { + Assert.fail(); + } } catch (Exception e) { if (assertException) { Assert.assertEquals(e.getCause().getMessage(), From 632e6d0312492e14875ce8a9d610773a68181196 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 10 May 2021 17:54:52 +0800 Subject: [PATCH 2/4] format code --- .../io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 1 + .../pulsar/handlers/kop/InnerTopicProtectionTest.java | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 405f5551f0..9759857335 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1842,6 +1842,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { .map(matchBroker -> metadataCache.get( String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker))) .collect(Collectors.toList()); + FutureUtil.waitForAll(list) .whenComplete((ignore, th) -> { if (th != null) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 1e708825ec..d816a53686 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -17,7 +17,6 @@ import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.List; import java.util.Optional; import java.util.Properties; import lombok.Cleanup; From 5c58fe39d28b8f4a468617ad62f2ad46346ffe7e Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 24 May 2021 10:53:04 +0800 Subject: [PATCH 3/4] format code --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 5 ----- .../pulsar/handlers/kop/InnerTopicProtectionTest.java | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 9759857335..6aab4195d5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -849,11 +849,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, } final Consumer persistentTopicConsumer = persistentTopic -> { - // check system topic - if (persistentTopic.isSystemTopic()) { - log.error("Not support produce message to system topic. topic: {}", persistentTopic); - throw new InvalidTopicException(Errors.INVALID_TOPIC_EXCEPTION.message()); - } publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName, offsetConsumer, errorsConsumer); }; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index d816a53686..2280b2bd21 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -64,7 +64,6 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setBrokerDeleteInactiveTopicsEnabled(false); kConfig.setSystemTopicEnabled(true); kConfig.setTopicLevelPoliciesEnabled(true); - kConfig.setDefaultNumPartitions(6); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); @@ -154,7 +153,7 @@ public void testInnerTopicProduce() throws PulsarAdminException { final String msg = "test-inner-topic-produce-and-consume"; assertProduceMessage(kafkaProducer, offsetTopic, msg, true); assertProduceMessage(kafkaProducer, transactionTopic, msg, true); - //assertProduceMessage(kafkaProducer, systemTopic, msg, true); + assertProduceMessage(kafkaProducer, systemTopic, msg, true); assertProduceMessage(kafkaProducer, commonTopic, msg, false); } From df84427e0d2499ea87c6bc3f8b164d0fd69dfa1f Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 24 May 2021 15:14:29 +0800 Subject: [PATCH 4/4] format code --- .../io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 6aab4195d5..2a5ca81313 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -82,7 +82,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors;