From 3f2aec192d825247168714c22eccc346adb33fe3 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Tue, 24 May 2022 14:52:02 +0800 Subject: [PATCH] [branch-2.9] Add publishRateLimitedTimes to topic metrics. --- .../pulsar/broker/service/AbstractTopic.java | 9 ++ .../pulsar/broker/service/ServerCnx.java | 13 +++ .../apache/pulsar/broker/service/Topic.java | 5 + .../service/persistent/PersistentTopic.java | 3 +- .../prometheus/NamespaceStatsAggregator.java | 1 + .../broker/stats/prometheus/TopicStats.java | 5 + .../broker/stats/PrometheusMetricsTest.java | 105 +++++++++++++++++- .../policies/data/stats/TopicStatsImpl.java | 5 + 8 files changed, 143 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index dedca9cced414..8d9c396a73cae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -116,6 +116,10 @@ public abstract class AbstractTopic implements Topic { private LongAdder bytesInCounter = new LongAdder(); private LongAdder msgInCounter = new LongAdder(); + private static final AtomicLongFieldUpdater RATE_LIMITED_UPDATER = + AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes"); + protected volatile long publishRateLimitedTimes = 0; + protected volatile Optional topicEpoch = Optional.empty(); private volatile boolean hasExclusiveProducer; // pointer to the exclusive producer @@ -634,6 +638,11 @@ protected void checkTopicFenced() throws BrokerServiceException { } } + @Override + public long increasePublishLimitedTimes() { + return RATE_LIMITED_UPDATER.incrementAndGet(this); + } + protected void internalAddProducer(Producer producer) throws BrokerServiceException { if (isProducersExceeded()) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d8a51f60d65b1..3868498d9a3b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2512,6 +2512,7 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages) // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on // client connection, possibly shared between multiple producers ctx.channel().config().setAutoRead(false); + recordRateLimitMetrics(producers); autoReadDisabledRateLimiting = isPublishRateExceeded; throttledConnections.inc(); } @@ -2533,6 +2534,17 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages) } } + private void recordRateLimitMetrics(ConcurrentLongHashMap> producers) { + producers.forEach((key, producerFuture) -> { + if (producerFuture != null && producerFuture.isDone()) { + Producer p = producerFuture.getNow(null); + if (p != null && p.getTopic() != null) { + p.getTopic().increasePublishLimitedTimes(); + } + } + }); + } + @Override public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) { if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread @@ -2577,6 +2589,7 @@ public void enableCnxAutoRead() { public void disableCnxAutoRead() { if (ctx != null && ctx.channel().config().isAutoRead()) { ctx.channel().config().setAutoRead(false); + recordRateLimitMetrics(producers); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 9dd945f78da33..14e8e3fe00418 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -129,6 +129,11 @@ default boolean isMarkerMessage() { */ void recordAddLatency(long latency, TimeUnit unit); + /** + * increase the publishing limited times. + */ + long increasePublishLimitedTimes(); + CompletableFuture subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7249d4054fedb..a2fa0fdd1830e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1685,7 +1685,7 @@ public ManagedLedger getManagedLedger() { public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) { - + this.publishRateLimitedTimes = 0; TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get(); topicStatsHelper.reset(); @@ -1934,6 +1934,7 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa stats.waitingPublishers = getWaitingProducersCount(); stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue(); stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue(); + stats.publishRateLimitedTimes = publishRateLimitedTimes; subscriptions.forEach((name, subscription) -> { SubscriptionStatsImpl subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 9df9ad4c7c624..2025c59c1a1f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -148,6 +148,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.msgOutCounter = tStatus.msgOutCounter; stats.bytesOutCounter = tStatus.bytesOutCounter; stats.averageMsgSize = tStatus.averageMsgSize; + stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes; stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index bfa427e5e3a7c..599c3d82f6fc5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -44,6 +44,8 @@ class TopicStats { public long msgBacklog; + long publishRateLimitedTimes; + long backlogQuotaLimit; long backlogQuotaLimitTime; @@ -82,6 +84,7 @@ public void reset() { managedLedgerStats.reset(); msgBacklog = 0; + publishRateLimitedTimes = 0L; backlogQuotaLimit = 0; backlogQuotaLimitTime = -1; @@ -133,6 +136,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats .offloadedStorageUsed, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 9f098935efabf..d5503f6f1778e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -55,12 +55,12 @@ import javax.crypto.SecretKey; import javax.naming.AuthenticationException; import lombok.Cleanup; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; @@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.compaction.Compactor; -import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -95,6 +94,108 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { super.internalCleanup(); + resetConfig(); + } + + @Test + public void testPublishRateLimitedTimes() throws Exception { + cleanup(); + checkPublishRateLimitedTimes(true); + cleanup(); + checkPublishRateLimitedTimes(false); + } + + private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception { + if (preciseRateLimit) { + conf.setBrokerPublisherThrottlingTickTimeMillis(10000000); + conf.setMaxPublishRatePerTopicInMessages(1); + conf.setMaxPublishRatePerTopicInBytes(1); + conf.setBrokerPublisherThrottlingMaxMessageRate(100000); + conf.setBrokerPublisherThrottlingMaxByteRate(10000000); + } else { + conf.setBrokerPublisherThrottlingTickTimeMillis(1); + conf.setBrokerPublisherThrottlingMaxMessageRate(1); + conf.setBrokerPublisherThrottlingMaxByteRate(1); + } + conf.setStatsUpdateFrequencyInSecs(100000000); + conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit); + setup(); + String ns1 = "prop/ns-abc1" + UUID.randomUUID(); + admin.namespaces().createNamespace(ns1, 1); + String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID(); + String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID(); + String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID(); + // Use another connection + @Cleanup + PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0); + + Producer producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false) + .topic(topicName).create(); + Producer producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false) + .topic(topicName2).create(); + Producer producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false) + .topic(topicName3).create(); + producer.sendAsync(new byte[11]); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false).get().get(); + Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes"); + field.setAccessible(true); + Awaitility.await().untilAsserted(() -> { + long value = (long) field.get(persistentTopic); + assertEquals(value, 1); + }); + @Cleanup + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times")); + metrics.get("pulsar_publish_rate_limit_times").forEach(item -> { + if (ns1.equals(item.tags.get("namespace"))) { + if (item.tags.get("topic").equals(topicName)) { + assertEquals(item.value, 1); + return; + } else if (item.tags.get("topic").equals(topicName2)) { + assertEquals(item.value, 1); + return; + } else if (item.tags.get("topic").equals(topicName3)) { + //When using precise rate limiting, we only trigger the rate limiting of the topic, + // so if the topic is not using the same connection, the rate limiting times will be 0 + //When using asynchronous rate limiting, we will trigger the broker-level rate limiting, + // and all connections will be limited at this time. + if (preciseRateLimit) { + assertEquals(item.value, 0); + } else { + assertEquals(item.value, 1); + } + return; + } + fail("should not fail"); + } + }); + // Stats updater will reset the stats + pulsar.getBrokerService().updateRates(); + Awaitility.await().untilAsserted(() -> { + long value = (long) field.get(persistentTopic); + assertEquals(value, 0); + }); + + @Cleanup + ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2); + String metricsStr2 = statsOut2.toString(); + Multimap metrics2 = parseMetrics(metricsStr2); + assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times")); + metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> { + if (ns1.equals(item.tags.get("namespace"))) { + assertEquals(item.value, 0); + } + }); + + producer.close(); + producer2.close(); + producer3.close(); } @Test diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index f3b3944b5fdc3..de016b0cf8f8a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -76,6 +76,9 @@ public class TopicStatsImpl implements TopicStats { /** Get estimated total unconsumed or backlog size in bytes. */ public long backlogSize; + /** The number of times the publishing rate limit was triggered. */ + public long publishRateLimitedTimes; + /** Space used to store the offloaded messages for the topic/. */ public long offloadedStorageSize; @@ -160,6 +163,7 @@ public void reset() { this.lastOffloadLedgerId = 0; this.lastOffloadFailureTimeStamp = 0; this.lastOffloadSuccessTimeStamp = 0; + this.publishRateLimitedTimes = 0L; this.compaction.reset(); } @@ -182,6 +186,7 @@ public TopicStatsImpl add(TopicStats ts) { this.averageMsgSize = newAverageMsgSize; this.storageSize += stats.storageSize; this.backlogSize += stats.backlogSize; + this.publishRateLimitedTimes += stats.publishRateLimitedTimes; this.offloadedStorageSize += stats.offloadedStorageSize; this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;