Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
# don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
dispatchThrottlingOnNonBacklogConsumerEnabled=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
# don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
dispatchThrottlingOnNonBacklogConsumerEnabled=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// default message-byte dispatch-throttling
@FieldContext(dynamic = true)
private long dispatchThrottlingRatePerTopicInByte = 0;
// Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
// don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
@FieldContext(dynamic = true)
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
@FieldContext(dynamic = true)
private int maxConcurrentLookupRequest = 10000;
Expand Down Expand Up @@ -522,6 +526,14 @@ public void setDispatchThrottlingRatePerTopicInByte(long dispatchThrottlingRateP
this.dispatchThrottlingRatePerTopicInByte = dispatchThrottlingRatePerTopicInByte;
}

public boolean isDispatchThrottlingOnNonBacklogConsumerEnabled() {
return dispatchThrottlingOnNonBacklogConsumerEnabled;
}

public void setDispatchThrottlingOnNonBacklogConsumerEnabled(boolean dispatchThrottlingOnNonBacklogConsumerEnabled) {
this.dispatchThrottlingOnNonBacklogConsumerEnabled = dispatchThrottlingOnNonBacklogConsumerEnabled;
}

public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
private final ServiceConfiguration serviceConfig;

enum ReadType {
Normal, Replay
Expand All @@ -92,6 +94,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = MaxReadBatchSize;
this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
.getMaxUnackedMessagesPerSubscription();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
}

@Override
Expand Down Expand Up @@ -173,10 +176,10 @@ public void readMoreEntries() {
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);

// throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from
// bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after
// MESSAGE_RATE_BACKOFF_MS
if (!cursor.isActive()) {
// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
DispatchRateLimiter rateLimiter = topic.getDispatchRateLimiter();
if (rateLimiter.isDispatchRateLimitingEnabled()) {
if (!rateLimiter.hasMessageDispatchPermit()) {
Expand Down Expand Up @@ -361,8 +364,11 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
}
}

topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);

// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}

if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
Expand All @@ -52,6 +53,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final ServiceConfiguration serviceConfig;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
Expand All @@ -61,6 +63,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.readBatchSize = MaxReadBatchSize;
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
}

protected void scheduleReadOnActiveConsumer() {
Expand Down Expand Up @@ -121,10 +124,12 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
final long totalBytesSent = sentMsgInfo.getTotalSentMessageBytes();
sentMsgInfo.getChannelPromse().addListener(future -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
// Schedule a new read batch operation only after the previous batch has been written to the socket
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
// acquire message-dispatch permits for already delivered messages
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
Consumer newConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
Expand Down Expand Up @@ -206,16 +211,16 @@ protected void readMoreEntries(Consumer consumer) {

int messagesToRead = Math.min(availablePermits, readBatchSize);

// throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from
// bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after
// MESSAGE_RATE_BACKOFF_MS
if (!cursor.isActive()) {
// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
DispatchRateLimiter rateLimiter = topic.getDispatchRateLimiter();
if (rateLimiter.isDispatchRateLimitingEnabled()) {
if (!rateLimiter.hasMessageDispatchPermit()) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}",
name, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}", name,
rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,76 @@ public void testGlobalNamespaceThrottling() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

/**
* It verifies that broker throttles already caught-up consumer which doesn't have backlog if the flag is enabled
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/use/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingBlock";

final int messageRate = 10;
DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1);

admin.namespaces().createNamespace(namespace);
admin.namespaces().setDispatchRate(namespace, dispatchRate);
admin.brokers().updateDynamicConfiguration("dispatchThrottlingOnNonBacklogConsumerEnabled",
Boolean.TRUE.toString());
// create producer and topic
Producer producer = pulsarClient.createProducer(topicName);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get();
boolean isUpdated = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) {
isUpdated = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isUpdated);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);

// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

int numMessages = 500;

final AtomicInteger totalReceived = new AtomicInteger(0);

ConsumerConfiguration consumerConf = new ConsumerConfiguration();
consumerConf.setSubscriptionType(subscription);
consumerConf.setMessageListener((consumer, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
});
Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", consumerConf);

// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
producer.send(new byte[80]);
}

// consumer should not have received all publihsed message due to message-rate throttling
Assert.assertTrue(totalReceived.get() < messageRate * 2);

consumer.close();
producer.close();
// revert default value
this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
log.info("-- Exiting {} test --", methodName);
}

private void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
statsUpdaterField.setAccessible(true);
Expand Down