diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 8379ad62b6..2a5ca81313 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -809,6 +809,8 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, e -> addPartitionResponse.accept(topicPartition, new PartitionResponse(Errors.forException(e))); final String fullPartitionName = KopTopic.toString(topicPartition); + + // check KOP inner topic if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) { log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", ctx.channel(), produceHar.getHeader(), topicPartition); @@ -845,9 +847,11 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, return; } - final Consumer persistentTopicConsumer = persistentTopic -> - publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName, - offsetConsumer, errorsConsumer); + final Consumer persistentTopicConsumer = persistentTopic -> { + publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName, + offsetConsumer, errorsConsumer); + }; + if (topicFuture.isDone()) { persistentTopicConsumer.accept(topicFuture.getNow(null)); } else { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 054319d0fd..2280b2bd21 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -161,6 +161,9 @@ private void assertProduceMessage(KafkaProducer producer, final String topic, fi boolean assertException) { try { producer.send(new ProducerRecord<>(topic, value)).get(); + if (assertException) { + Assert.fail(); + } } catch (Exception e) { if (assertException) { Assert.assertEquals(e.getCause().getMessage(),