diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java
index 06806f40a0..a16211aa56 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
/**
@@ -105,8 +106,10 @@ public void publishMessages() {
}
topicManager.registerProducerInPersistentTopic(partitionName, persistentTopic);
// collect metrics
- topicManager.getReferenceProducer(partitionName).getTopic()
- .incrementPublishCount(numMessages, byteBuf.readableBytes());
+ Producer producer = topicManager.getReferenceProducer(partitionName);
+ producer.updateRates(numMessages, byteBuf.readableBytes());
+ producer.getTopic().incrementPublishCount(numMessages, byteBuf.readableBytes());
+ // publish
persistentTopic.publishMessage(byteBuf,
MessagePublishContext.get(offsetFuture, persistentTopic, System.nanoTime()));
offsetFuture.whenComplete((offset, e) -> {
diff --git a/pom.xml b/pom.xml
index 71a5e68d52..062cb96ff3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
2.13.3
1.18.4
2.22.0
- 2.8.0-rc-202012242230
+ 2.8.0-rc-202012272229
1.7.25
3.1.8
1.12.5
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaProducerStatsTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaProducerStatsTest.java
new file mode 100644
index 0000000000..3777e27488
--- /dev/null
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaProducerStatsTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.kop;
+
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import com.google.common.collect.Sets;
+import java.util.Properties;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for producer stats.
+ */
+@Slf4j
+public class KafkaProducerStatsTest extends KopProtocolHandlerTestBase {
+
+ @DataProvider(name = "partitionsAndBatch")
+ public static Object[][] partitionsAndBatch() {
+ return new Object[][] {
+ { 1, true },
+ { 1, false },
+ { 7, true },
+ { 7, false }
+ };
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ log.info("success internal setup");
+
+ if (!admin.clusters().getClusters().contains(configClusterName)) {
+ // so that clients can test short names
+ admin.clusters().createCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ } else {
+ admin.clusters().updateCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ }
+
+ if (!admin.tenants().getTenants().contains("public")) {
+ admin.tenants().createTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ } else {
+ admin.tenants().updateTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/default")) {
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/default",
+ new RetentionPolicies(60, 1000));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) {
+ admin.namespaces().createNamespace("public/__kafka");
+ admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/__kafka",
+ new RetentionPolicies(-1, -1));
+ }
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 20000, dataProvider = "partitionsAndBatch")
+ public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) throws Exception {
+ String kafkaTopicName = "kopKafkaProducePulsarMetrics" + partitionNumber;
+ String pulsarTopicName = "persistent://public/default/" + kafkaTopicName;
+
+ // create partitioned topic.
+ admin.topics().createPartitionedTopic(kafkaTopicName, partitionNumber);
+
+ // 1. produce message with Kafka producer.
+ @Cleanup
+ KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort());
+
+ int totalMsgs = 10;
+
+ String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_" + partitionNumber + "_";
+
+ for (int i = 0; i < totalMsgs; i++) {
+ String messageStr = messageStrPrefix + i;
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaTopicName,
+ i,
+ messageStr);
+
+ kProducer.getProducer().send(record).get();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr);
+ }
+ }
+
+ long msgInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).msgInCounter;
+ assertEquals(msgInCounter, totalMsgs);
+ long bytesInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).bytesInCounter;
+ assertNotEquals(bytesInCounter, 0);
+ }
+
+ @Test(timeOut = 20000)
+ public void testKafkaProducePulsarRates() throws Exception {
+ String topicName = "testBrokerPublishMetrics";
+ String pulsarTopicName = "persistent://public/default/" + topicName + "-partition-0";
+
+ // create partitioned topic.
+ admin.topics().createPartitionedTopic(topicName, 1);
+
+ // 1. produce message with Kafka producer.
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort);
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ @Cleanup
+ KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
+ int numMessages = 100;
+ int msgBytes = 80;
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(new ProducerRecord<>(topicName, new byte[msgBytes])).get();
+ }
+
+ // create producer and topic
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopicIfExists(pulsarTopicName).get().get();
+
+ Producer prod = topic.getProducers().values().iterator().next();
+ // reset counter
+ prod.updateRates();
+ double msgThroughputIn = prod.getStats().msgThroughputIn;
+ double msgRateIn = prod.getStats().msgRateIn;
+ double averageMsgSize = prod.getStats().averageMsgSize;
+ Assert.assertTrue(msgThroughputIn > numMessages * msgBytes);
+ Assert.assertTrue(msgRateIn > numMessages);
+ Assert.assertTrue(averageMsgSize > msgBytes);
+
+ producer.close();
+ }
+}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java
index 3176ed08e8..93ca339e5b 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestTypeTest.java
@@ -16,7 +16,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -215,42 +214,6 @@ public void testKafkaProducePulsarConsume(int partitionNumber, boolean isBatch)
assertNull(msg);
}
- @Test(timeOut = 20000, dataProvider = "partitionsAndBatch")
- public void testKafkaProducePulsarMetrics(int partitionNumber, boolean isBatch) throws Exception {
- String kafkaTopicName = "kopKafkaProducePulsarMetrics" + partitionNumber;
- String pulsarTopicName = "persistent://public/default/" + kafkaTopicName;
-
- // create partitioned topic.
- admin.topics().createPartitionedTopic(kafkaTopicName, partitionNumber);
-
- // 1. produce message with Kafka producer.
- @Cleanup
- KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort());
-
- int totalMsgs = 10;
-
- String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsume_" + partitionNumber + "_";
-
- for (int i = 0; i < totalMsgs; i++) {
- String messageStr = messageStrPrefix + i;
- ProducerRecord record = new ProducerRecord<>(
- kafkaTopicName,
- i,
- messageStr);
-
- kProducer.getProducer().send(record).get();
-
- if (log.isDebugEnabled()) {
- log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr);
- }
- }
-
- long msgInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).msgInCounter;
- assertEquals(msgInCounter, totalMsgs);
- long bytesInCounter = admin.topics().getPartitionedStats(pulsarTopicName, false).bytesInCounter;
- assertNotEquals(bytesInCounter, 0);
- }
-
@Test(timeOut = 20000, dataProvider = "partitionsAndBatch")
public void testKafkaProduceKafkaConsume(int partitionNumber, boolean isBatch) throws Exception {
String kafkaTopicName = "kopKafkaProduceKafkaConsume" + partitionNumber;