diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 14dfb4b14b..48680cf6ad 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -575,13 +575,13 @@ private void publishMessages(MemoryRecords records, log.error("record to bytebuf error: ", ex); future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { - doPublishMessages(topic); + doPublishMessages(topic, size.get()); } }); } } - private void doPublishMessages(TopicName topic) { + private void doPublishMessages(TopicName topic, int size) { Queue, CompletableFuture>> topicQueue = transQueue.get(topic); @@ -603,6 +603,10 @@ private void doPublishMessages(TopicName topic) { result.getRight().complete(new PartitionResponse(Errors.LEADER_NOT_AVAILABLE)); } else { topicManager.registerProducerInPersistentTopic(topic.toString(), persistentTopic); + // collect metrics + topicManager.getReferenceProducer(topic.toString()) + .getTopic().incrementPublishCount(size, headerAndPayload.readableBytes()); + // publish message persistentTopic.publishMessage( headerAndPayload, MessagePublishContext.get( diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index d4106df1f4..48ba56afcb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -370,6 +370,10 @@ public synchronized void close() { } } + public Producer getReferenceProducer(String topicName) { + return references.get(topicName); + } + public void deReference(String topicName) { try { removeLookupCache(topicName); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java index 93ca339e5b..3176ed08e8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java @@ -16,6 +16,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -214,6 +215,42 @@ public void testKafkaProducePulsarConsume(int partitionNumber, boolean isBatch) assertNull(msg); } + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") + public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) throws Exception { + String kafkaTopicName = "kopKafkaProducePulsarMetrics" + partitionNumber; + String pulsarTopicName = "persistent://public/default/" + kafkaTopicName; + + // create partitioned topic. + admin.topics().createPartitionedTopic(kafkaTopicName, partitionNumber); + + // 1. produce message with Kafka producer. + @Cleanup + KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort()); + + int totalMsgs = 10; + + String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_" + partitionNumber + "_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + ProducerRecord record = new ProducerRecord<>( + kafkaTopicName, + i, + messageStr); + + kProducer.getProducer().send(record).get(); + + if (log.isDebugEnabled()) { + log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + } + } + + long msgInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).msgInCounter; + assertEquals(msgInCounter, totalMsgs); + long bytesInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).bytesInCounter; + assertNotEquals(bytesInCounter, 0); + } + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") public void testKafkaProduceKafkaConsume(int partitionNumber, boolean isBatch) throws Exception { String kafkaTopicName = "kopKafkaProduceKafkaConsume" + partitionNumber;