From 8b4d41abb8858c73dc6e3e22fcef1c96f53e6596 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Mon, 11 Sep 2017 14:55:18 -0700 Subject: [PATCH] Add configuration to enable throttling for non-backlog consumers --- conf/broker.conf | 4 ++ conf/standalone.conf | 4 ++ .../pulsar/broker/ServiceConfiguration.java | 12 ++++ ...PersistentDispatcherMultipleConsumers.java | 18 +++-- ...sistentDispatcherSingleActiveConsumer.java | 21 +++--- .../api/MessageDispatchThrottlingTest.java | 70 +++++++++++++++++++ 6 files changed, 115 insertions(+), 14 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 486d8f1561619..fef42be6dc981 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -114,6 +114,10 @@ dispatchThrottlingRatePerTopicInMsg=0 # default message-byte dispatch-throttling dispatchThrottlingRatePerTopicInByte=0 +# Default dispatch-throttling is disabled for consumers which already caught-up with published messages and +# don't have backlog. This enables dispatch-throttling for non-backlog consumers as well. +dispatchThrottlingOnNonBacklogConsumerEnabled=false + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=10000 diff --git a/conf/standalone.conf b/conf/standalone.conf index f1067f6172c83..2e44e9ac8cec9 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -103,6 +103,10 @@ dispatchThrottlingRatePerTopicInMsg=0 # default message-byte dispatch-throttling dispatchThrottlingRatePerTopicInByte=0 +# Default dispatch-throttling is disabled for consumers which already caught-up with published messages and +# don't have backlog. This enables dispatch-throttling for non-backlog consumers as well. +dispatchThrottlingOnNonBacklogConsumerEnabled=false + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=10000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 923557aceaa4b..a938b566726fd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -112,6 +112,10 @@ public class ServiceConfiguration implements PulsarConfiguration { // default message-byte dispatch-throttling @FieldContext(dynamic = true) private long dispatchThrottlingRatePerTopicInByte = 0; + // Default dispatch-throttling is disabled for consumers which already caught-up with published messages and + // don't have backlog. This enables dispatch-throttling for non-backlog consumers as well. + @FieldContext(dynamic = true) + private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false; // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic @FieldContext(dynamic = true) private int maxConcurrentLookupRequest = 10000; @@ -522,6 +526,14 @@ public void setDispatchThrottlingRatePerTopicInByte(long dispatchThrottlingRateP this.dispatchThrottlingRatePerTopicInByte = dispatchThrottlingRatePerTopicInByte; } + public boolean isDispatchThrottlingOnNonBacklogConsumerEnabled() { + return dispatchThrottlingOnNonBacklogConsumerEnabled; + } + + public void setDispatchThrottlingOnNonBacklogConsumerEnabled(boolean dispatchThrottlingOnNonBacklogConsumerEnabled) { + this.dispatchThrottlingOnNonBacklogConsumerEnabled = dispatchThrottlingOnNonBacklogConsumerEnabled; + } + public int getMaxConcurrentLookupRequest() { return maxConcurrentLookupRequest; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4501d8792e4bf..5e70036f1546c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; @@ -79,6 +80,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu private volatile int blockedDispatcherOnUnackedMsgs = FALSE; private static final AtomicIntegerFieldUpdater BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs"); + private final ServiceConfiguration serviceConfig; enum ReadType { Normal, Replay @@ -92,6 +94,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.readBatchSize = MaxReadBatchSize; this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); + this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); } @Override @@ -173,10 +176,10 @@ public void readMoreEntries() { if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { int messagesToRead = Math.min(totalAvailablePermits, readBatchSize); - // throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from - // bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after - // MESSAGE_RATE_BACKOFF_MS - if (!cursor.isActive()) { + // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz + // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate + // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS + if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { DispatchRateLimiter rateLimiter = topic.getDispatchRateLimiter(); if (rateLimiter.isDispatchRateLimitingEnabled()) { if (!rateLimiter.hasMessageDispatchPermit()) { @@ -361,8 +364,11 @@ public synchronized void readEntriesComplete(List entries, Object ctx) { } } - topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); - + // acquire message-dispatch permits for already delivered messages + if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { + topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); + } + if (entriesToDispatch > 0) { if (log.isDebugEnabled()) { log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 9a1277d0f6dd0..1a6b65af7c78c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; @@ -52,6 +53,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private static final int MaxReadBatchSize = 100; private int readBatchSize; private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private final ServiceConfiguration serviceConfig; public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic) { @@ -61,6 +63,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su : ""/* NonDurableCursor doesn't have name */); this.cursor = cursor; this.readBatchSize = MaxReadBatchSize; + this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); } protected void scheduleReadOnActiveConsumer() { @@ -121,10 +124,12 @@ public synchronized void readEntriesComplete(final List entries, Object o final long totalBytesSent = sentMsgInfo.getTotalSentMessageBytes(); sentMsgInfo.getChannelPromse().addListener(future -> { if (future.isSuccess()) { + // acquire message-dispatch permits for already delivered messages + if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { + topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); + } // Schedule a new read batch operation only after the previous batch has been written to the socket synchronized (PersistentDispatcherSingleActiveConsumer.this) { - // acquire message-dispatch permits for already delivered messages - topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); Consumer newConsumer = ACTIVE_CONSUMER_UPDATER.get(this); if (newConsumer != null && !havePendingRead) { readMoreEntries(newConsumer); @@ -206,16 +211,16 @@ protected void readMoreEntries(Consumer consumer) { int messagesToRead = Math.min(availablePermits, readBatchSize); - // throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from - // bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after - // MESSAGE_RATE_BACKOFF_MS - if (!cursor.isActive()) { + // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz + // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate + // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS + if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { DispatchRateLimiter rateLimiter = topic.getDispatchRateLimiter(); if (rateLimiter.isDispatchRateLimitingEnabled()) { if (!rateLimiter.hasMessageDispatchPermit()) { if (log.isDebugEnabled()) { - log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}", - name, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), + log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}", name, + rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), MESSAGE_RATE_BACKOFF_MS); } topic.getBrokerService().executor().schedule(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 2807a5c05d777..a41800d9b0d5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -703,6 +703,76 @@ public void testGlobalNamespaceThrottling() throws Exception { log.info("-- Exiting {} test --", methodName); } + /** + * It verifies that broker throttles already caught-up consumer which doesn't have backlog if the flag is enabled + * + * @param subscription + * @throws Exception + */ + @Test(dataProvider = "subscriptions", timeOut = 5000) + public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscription) throws Exception { + log.info("-- Starting {} test --", methodName); + + final String namespace = "my-property/use/throttling_ns"; + final String topicName = "persistent://" + namespace + "/throttlingBlock"; + + final int messageRate = 10; + DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1); + + admin.namespaces().createNamespace(namespace); + admin.namespaces().setDispatchRate(namespace, dispatchRate); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingOnNonBacklogConsumerEnabled", + Boolean.TRUE.toString()); + // create producer and topic + Producer producer = pulsarClient.createProducer(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + boolean isUpdated = false; + int retry = 5; + for (int i = 0; i < retry; i++) { + if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) { + isUpdated = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(isUpdated); + Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate); + + // enable throttling for nonBacklog consumers + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true); + + int numMessages = 500; + + final AtomicInteger totalReceived = new AtomicInteger(0); + + ConsumerConfiguration consumerConf = new ConsumerConfiguration(); + consumerConf.setSubscriptionType(subscription); + consumerConf.setMessageListener((consumer, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }); + Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", consumerConf); + + // Asynchronously produce messages + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + + // consumer should not have received all publihsed message due to message-rate throttling + Assert.assertTrue(totalReceived.get() < messageRate * 2); + + consumer.close(); + producer.close(); + // revert default value + this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false); + log.info("-- Exiting {} test --", methodName); + } + private void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater"); statsUpdaterField.setAccessible(true);