From f81f3e6f893a95cdcd5ff791a5b1a9c871d37ed6 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Mon, 14 Dec 2020 20:35:29 +0800 Subject: [PATCH 1/2] consumer support update stats with specified stats --- .../pulsar/broker/service/Consumer.java | 12 +++++++++ .../broker/service/SubscriptionSeekTest.java | 26 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index cb257168ca22b..4651f84ef5be6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -572,6 +572,18 @@ public void updateRates() { stats.chuckedMessageRate = chuckedMessageRate.getRate(); } + public void updateStats(ConsumerStats consumerStats) { + msgOutCounter.add(consumerStats.msgOutCounter); + bytesOutCounter.add(consumerStats.bytesOutCounter); + msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter); + lastAckedTimestamp = consumerStats.lastAckedTimestamp; + lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; + MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits); + unackedMessages = consumerStats.unackedMessages; + blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs; + AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry); + } + public ConsumerStats getStats() { stats.msgOutCounter = msgOutCounter.longValue(); stats.bytesOutCounter = bytesOutCounter.longValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index b8dfbc8a16b4d..17fe80170fc45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -493,4 +494,29 @@ public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek } assertTrue(hasConsumerNotDisconnected); } + + @Test + public void testUpdateStatsForSingleActiveConsumerDispatcherWhenSeek() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForSingleActiveConsumerDispatcherWhenSeek"; + // Disable pre-fetch in consumer to track the messages received + pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscription") + .subscribe(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getSubscriptions().size(), 1); + List consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers(); + assertEquals(consumers.size(), 1); + ConsumerStats consumerStats = new ConsumerStats(); + consumerStats.msgOutCounter = 10; + consumerStats.bytesOutCounter = 1280; + consumers.get(0).updateStats(consumerStats); + ConsumerStats updatedStats = consumers.get(0).getStats(); + + assertEquals(updatedStats.msgOutCounter, 10); + assertEquals(updatedStats.bytesOutCounter, 1280); + } } From fe7cbc47547401b487e23694c3c07f9c89bc4c53 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Tue, 15 Dec 2020 11:55:52 +0800 Subject: [PATCH 2/2] move UT to ConsumerStatsTest --- .../broker/service/SubscriptionSeekTest.java | 26 ------------------ .../broker/stats/ConsumerStatsTest.java | 27 +++++++++++++++++++ 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 17fe80170fc45..b8dfbc8a16b4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -46,7 +46,6 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -494,29 +493,4 @@ public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek } assertTrue(hasConsumerNotDisconnected); } - - @Test - public void testUpdateStatsForSingleActiveConsumerDispatcherWhenSeek() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForSingleActiveConsumerDispatcherWhenSeek"; - // Disable pre-fetch in consumer to track the messages received - pulsarClient.newConsumer() - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .subscriptionName("my-subscription") - .subscribe(); - - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); - assertNotNull(topicRef); - assertEquals(topicRef.getSubscriptions().size(), 1); - List consumers = topicRef.getSubscriptions().get("my-subscription").getConsumers(); - assertEquals(consumers.size(), 1); - ConsumerStats consumerStats = new ConsumerStats(); - consumerStats.msgOutCounter = 10; - consumerStats.bytesOutCounter = 1280; - consumers.get(0).updateStats(consumerStats); - ConsumerStats updatedStats = consumers.get(0).getStats(); - - assertEquals(updatedStats.msgOutCounter, 10); - assertEquals(updatedStats.bytesOutCounter, 1280); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 8418b2e968c7d..acf8e65210414 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -26,11 +26,14 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.List; import java.util.concurrent.TimeUnit; @Slf4j @@ -140,4 +143,28 @@ public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws Puls } } + @Test + public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"; + pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscription") + .subscribe(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + Assert.assertNotNull(topicRef); + Assert.assertEquals(topicRef.getSubscriptions().size(), 1); + List consumers = topicRef.getSubscriptions() + .get("my-subscription").getConsumers(); + Assert.assertEquals(consumers.size(), 1); + ConsumerStats consumerStats = new ConsumerStats(); + consumerStats.msgOutCounter = 10; + consumerStats.bytesOutCounter = 1280; + consumers.get(0).updateStats(consumerStats); + ConsumerStats updatedStats = consumers.get(0).getStats(); + + Assert.assertEquals(updatedStats.msgOutCounter, 10); + Assert.assertEquals(updatedStats.bytesOutCounter, 1280); + } }