From 19c26b14da19dfab75c42b07d2e3e227a7c0eb77 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Wed, 11 Nov 2020 19:26:31 +0800 Subject: [PATCH 1/5] use producer to publish message for metrics collected by pulsar --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 11 +++++------ .../pulsar/handlers/kop/KafkaTopicManager.java | 4 ++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 14dfb4b14b..f321833073 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -575,13 +575,13 @@ private void publishMessages(MemoryRecords records, log.error("record to bytebuf error: ", ex); future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { - doPublishMessages(topic); + doPublishMessages(topic, size.get()); } }); } } - private void doPublishMessages(TopicName topic) { + private void doPublishMessages(TopicName topic, int size) { Queue, CompletableFuture>> topicQueue = transQueue.get(topic); @@ -603,10 +603,9 @@ private void doPublishMessages(TopicName topic) { result.getRight().complete(new PartitionResponse(Errors.LEADER_NOT_AVAILABLE)); } else { topicManager.registerProducerInPersistentTopic(topic.toString(), persistentTopic); - persistentTopic.publishMessage( - headerAndPayload, - MessagePublishContext.get( - offsetFuture, persistentTopic, System.nanoTime())); + topicManager.getReferenceProducer(topic.toString()).publishMessage(0, 0, + headerAndPayload, size, false); + offsetFuture.complete(Long.valueOf(size)); } }); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index d4106df1f4..48ba56afcb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -370,6 +370,10 @@ public synchronized void close() { } } + public Producer getReferenceProducer(String topicName) { + return references.get(topicName); + } + public void deReference(String topicName) { try { removeLookupCache(topicName); From 8c20da4ac97304eadfb85532b86c916bf6e40e0b Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 12 Nov 2020 19:19:12 +0800 Subject: [PATCH 2/5] add UT --- .../handlers/kop/KafkaRequestHandler.java | 11 +++- .../handlers/kop/KafkaRequestTypeTest.java | 65 +++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index f321833073..48680cf6ad 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -603,9 +603,14 @@ private void doPublishMessages(TopicName topic, int size) { result.getRight().complete(new PartitionResponse(Errors.LEADER_NOT_AVAILABLE)); } else { topicManager.registerProducerInPersistentTopic(topic.toString(), persistentTopic); - topicManager.getReferenceProducer(topic.toString()).publishMessage(0, 0, - headerAndPayload, size, false); - offsetFuture.complete(Long.valueOf(size)); + // collect metrics + topicManager.getReferenceProducer(topic.toString()) + .getTopic().incrementPublishCount(size, headerAndPayload.readableBytes()); + // publish message + persistentTopic.publishMessage( + headerAndPayload, + MessagePublishContext.get( + offsetFuture, persistentTopic, System.nanoTime())); } }); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java index 93ca339e5b..a66c927714 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java @@ -34,11 +34,14 @@ import java.util.stream.IntStream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; @@ -214,6 +217,68 @@ public void testKafkaProducePulsarConsume(int partitionNumber, boolean isBatch) assertNull(msg); } + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") + public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) throws Exception { + String kafkaTopicName = "kopKafkaProducePulsarMetrics" + partitionNumber; + String pulsarTopicName = "persistent://public/default/" + kafkaTopicName; + String key1 = "header_key1_"; + String key2 = "header_key2_"; + String value1 = "header_value1_"; + String value2 = "header_value2_"; + + // create partitioned topic. + admin.topics().createPartitionedTopic(kafkaTopicName, partitionNumber); + + // 1. produce message with Kafka producer. + @Cleanup + KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort()); + + int totalMsgs = 10; + int totalBytes = 0; + byte maxUsableMagic = new ApiVersions().maxUsableProduceMagic(); + org.apache.kafka.common.record.CompressionType compression = + org.apache.kafka.common.record.CompressionType.NONE; + RecordMetadata recordMetadata = null; + + String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_" + partitionNumber + "_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + ProducerRecord record = new ProducerRecord<>( + kafkaTopicName, + i, + messageStr); + record.headers() + .add(key1 + i, (value1 + i).getBytes(UTF_8)) + .add(key2 + i, (value2 + i).getBytes(UTF_8)); + + if (isBatch) { + recordMetadata = (RecordMetadata)kProducer.getProducer() + .send(record).get(); + } else { + recordMetadata = (RecordMetadata)kProducer.getProducer() + .send(record) + .get(); + } + + byte[] serializedKey = new byte[recordMetadata.serializedKeySize()]; + byte[] serializedValue = new byte[recordMetadata.serializedValueSize()]; + totalBytes = totalBytes + AbstractRecords.estimateSizeInBytesUpperBound( + maxUsableMagic, compression, serializedKey, + serializedValue, record.headers().toArray()) - 7; + + if (log.isDebugEnabled()) { + log.debug("Kafka Producer Sent message with header: ({}, {})", i, messageStr); + } + } + kProducer.close(); + + long msgInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).msgInCounter; + assertEquals(msgInCounter, totalMsgs); + long bytesInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).bytesInCounter; + assertEquals(bytesInCounter, totalBytes); + } + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") public void testKafkaProduceKafkaConsume(int partitionNumber, boolean isBatch) throws Exception { String kafkaTopicName = "kopKafkaProduceKafkaConsume" + partitionNumber; From f6e6cf1cb498f56bf559b24e252dded432376819 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 12 Nov 2020 19:29:51 +0800 Subject: [PATCH 3/5] fix checkstyle --- .../pulsar/handlers/kop/KafkaRequestTypeTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java index a66c927714..cef7107ada 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java @@ -253,12 +253,11 @@ public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) .add(key2 + i, (value2 + i).getBytes(UTF_8)); if (isBatch) { - recordMetadata = (RecordMetadata)kProducer.getProducer() - .send(record).get(); + recordMetadata = (RecordMetadata) kProducer + .getProducer().send(record).get(); } else { - recordMetadata = (RecordMetadata)kProducer.getProducer() - .send(record) - .get(); + recordMetadata = (RecordMetadata) kProducer + .getProducer().send(record).get(); } byte[] serializedKey = new byte[recordMetadata.serializedKeySize()]; From 3425df885e7973d5d7840e21301add635a8fe4b0 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 12 Nov 2020 20:20:56 +0800 Subject: [PATCH 4/5] fix UT --- .../handlers/kop/KafkaRequestTypeTest.java | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java index cef7107ada..9227d3fd0f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java @@ -16,6 +16,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -34,14 +35,11 @@ import java.util.stream.IntStream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; @@ -234,11 +232,6 @@ public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort()); int totalMsgs = 10; - int totalBytes = 0; - byte maxUsableMagic = new ApiVersions().maxUsableProduceMagic(); - org.apache.kafka.common.record.CompressionType compression = - org.apache.kafka.common.record.CompressionType.NONE; - RecordMetadata recordMetadata = null; String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_" + partitionNumber + "_"; @@ -253,29 +246,23 @@ public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) .add(key2 + i, (value2 + i).getBytes(UTF_8)); if (isBatch) { - recordMetadata = (RecordMetadata) kProducer - .getProducer().send(record).get(); + kProducer.getProducer() + .send(record).get(); } else { - recordMetadata = (RecordMetadata) kProducer - .getProducer().send(record).get(); + kProducer.getProducer() + .send(record) + .get(); } - byte[] serializedKey = new byte[recordMetadata.serializedKeySize()]; - byte[] serializedValue = new byte[recordMetadata.serializedValueSize()]; - totalBytes = totalBytes + AbstractRecords.estimateSizeInBytesUpperBound( - maxUsableMagic, compression, serializedKey, - serializedValue, record.headers().toArray()) - 7; - if (log.isDebugEnabled()) { log.debug("Kafka Producer Sent message with header: ({}, {})", i, messageStr); } } - kProducer.close(); long msgInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).msgInCounter; assertEquals(msgInCounter, totalMsgs); long bytesInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).bytesInCounter; - assertEquals(bytesInCounter, totalBytes); + assertNotEquals(bytesInCounter, 0); } @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") From 3f2dea38711893bc01fcf03ce941801ed70d6d85 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Fri, 13 Nov 2020 09:35:26 +0800 Subject: [PATCH 5/5] remove redundant code --- .../handlers/kop/KafkaRequestTypeTest.java | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java index 9227d3fd0f..3176ed08e8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java @@ -219,10 +219,6 @@ public void testKafkaProducePulsarConsume(int partitionNumber, boolean isBatch) public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) throws Exception { String kafkaTopicName = "kopKafkaProducePulsarMetrics" + partitionNumber; String pulsarTopicName = "persistent://public/default/" + kafkaTopicName; - String key1 = "header_key1_"; - String key2 = "header_key2_"; - String value1 = "header_value1_"; - String value2 = "header_value2_"; // create partitioned topic. admin.topics().createPartitionedTopic(kafkaTopicName, partitionNumber); @@ -241,21 +237,11 @@ public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) kafkaTopicName, i, messageStr); - record.headers() - .add(key1 + i, (value1 + i).getBytes(UTF_8)) - .add(key2 + i, (value2 + i).getBytes(UTF_8)); - if (isBatch) { - kProducer.getProducer() - .send(record).get(); - } else { - kProducer.getProducer() - .send(record) - .get(); - } + kProducer.getProducer().send(record).get(); if (log.isDebugEnabled()) { - log.debug("Kafka Producer Sent message with header: ({}, {})", i, messageStr); + log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); } }