Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public abstract class AbstractTopic implements Topic {
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();

private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
protected volatile long publishRateLimitedTimes = 0;

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
// pointer to the exclusive producer
Expand Down Expand Up @@ -634,6 +638,11 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

@Override
public long increasePublishLimitedTimes() {
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,7 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages)
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
recordRateLimitMetrics(producers);
autoReadDisabledRateLimiting = isPublishRateExceeded;
throttledConnections.inc();
}
Expand All @@ -2533,6 +2534,17 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages)
}
}

private void recordRateLimitMetrics(ConcurrentLongHashMap<CompletableFuture<Producer>> producers) {
producers.forEach((key, producerFuture) -> {
if (producerFuture != null && producerFuture.isDone()) {
Producer p = producerFuture.getNow(null);
if (p != null && p.getTopic() != null) {
p.getTopic().increasePublishLimitedTimes();
}
}
});
}

@Override
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
Expand Down Expand Up @@ -2577,6 +2589,7 @@ public void enableCnxAutoRead() {
public void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
recordRateLimitMetrics(producers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ default boolean isMarkerMessage() {
*/
void recordAddLatency(long latency, TimeUnit unit);

/**
* increase the publishing limited times.
*/
long increasePublishLimitedTimes();

CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable,
MessageId startMessageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ public ManagedLedger getManagedLedger() {
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {

this.publishRateLimitedTimes = 0;
TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
topicStatsHelper.reset();

Expand Down Expand Up @@ -1934,6 +1934,7 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
stats.publishRateLimitedTimes = publishRateLimitedTimes;

subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.msgOutCounter = tStatus.msgOutCounter;
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class TopicStats {

public long msgBacklog;

long publishRateLimitedTimes;

long backlogQuotaLimit;
long backlogQuotaLimitTime;

Expand Down Expand Up @@ -82,6 +84,7 @@ public void reset() {

managedLedgerStats.reset();
msgBacklog = 0;
publishRateLimitedTimes = 0L;
backlogQuotaLimit = 0;
backlogQuotaLimitTime = -1;

Expand Down Expand Up @@ -133,6 +136,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
.offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
Expand All @@ -74,7 +74,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand All @@ -95,6 +94,108 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

@Test
public void testPublishRateLimitedTimes() throws Exception {
cleanup();
checkPublishRateLimitedTimes(true);
cleanup();
checkPublishRateLimitedTimes(false);
}

private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception {
if (preciseRateLimit) {
conf.setBrokerPublisherThrottlingTickTimeMillis(10000000);
conf.setMaxPublishRatePerTopicInMessages(1);
conf.setMaxPublishRatePerTopicInBytes(1);
conf.setBrokerPublisherThrottlingMaxMessageRate(100000);
conf.setBrokerPublisherThrottlingMaxByteRate(10000000);
} else {
conf.setBrokerPublisherThrottlingTickTimeMillis(1);
conf.setBrokerPublisherThrottlingMaxMessageRate(1);
conf.setBrokerPublisherThrottlingMaxByteRate(1);
}
conf.setStatsUpdateFrequencyInSecs(100000000);
conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit);
setup();
String ns1 = "prop/ns-abc1" + UUID.randomUUID();
admin.namespaces().createNamespace(ns1, 1);
String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
// Use another connection
@Cleanup
PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0);

Producer<byte[]> producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false)
.topic(topicName).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false)
.topic(topicName2).create();
Producer<byte[]> producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false)
.topic(topicName3).create();
producer.sendAsync(new byte[11]);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).get().get();
Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes");
field.setAccessible(true);
Awaitility.await().untilAsserted(() -> {
long value = (long) field.get(persistentTopic);
assertEquals(value, 1);
});
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
metrics.get("pulsar_publish_rate_limit_times").forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
if (item.tags.get("topic").equals(topicName)) {
assertEquals(item.value, 1);
return;
} else if (item.tags.get("topic").equals(topicName2)) {
assertEquals(item.value, 1);
return;
} else if (item.tags.get("topic").equals(topicName3)) {
//When using precise rate limiting, we only trigger the rate limiting of the topic,
// so if the topic is not using the same connection, the rate limiting times will be 0
//When using asynchronous rate limiting, we will trigger the broker-level rate limiting,
// and all connections will be limited at this time.
if (preciseRateLimit) {
assertEquals(item.value, 0);
} else {
assertEquals(item.value, 1);
}
return;
}
fail("should not fail");
}
});
// Stats updater will reset the stats
pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() -> {
long value = (long) field.get(persistentTopic);
assertEquals(value, 0);
});

@Cleanup
ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
String metricsStr2 = statsOut2.toString();
Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
assertEquals(item.value, 0);
}
});

producer.close();
producer2.close();
producer3.close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class TopicStatsImpl implements TopicStats {
/** Get estimated total unconsumed or backlog size in bytes. */
public long backlogSize;

/** The number of times the publishing rate limit was triggered. */
public long publishRateLimitedTimes;

/** Space used to store the offloaded messages for the topic/. */
public long offloadedStorageSize;

Expand Down Expand Up @@ -160,6 +163,7 @@ public void reset() {
this.lastOffloadLedgerId = 0;
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
this.compaction.reset();
}

Expand All @@ -182,6 +186,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
this.publishRateLimitedTimes += stats.publishRateLimitedTimes;
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
Expand Down