diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
index 30a36eeba7b4c..f9910b484deb1 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java
@@ -21,6 +21,7 @@
import static org.testng.Assert.fail;
import java.io.IOException;
+import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,13 +31,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.api.Authentication;
@@ -705,4 +706,27 @@ public void testTlsAuthUseTrustCert() throws Exception {
}
}
+ /**
+ * Verifies: client side throttling.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLookupThrottlingForClientByClient() throws Exception {
+ final String topicName = "persistent://prop/usw/my-ns/newTopic";
+
+ com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
+ clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+ clientConf.setConcurrentLookupRequest(0);
+ String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+ PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+ try {
+ Consumer consumer = pulsarClient.subscribe(topicName, "mysub", new ConsumerConfiguration());
+ fail("It should fail as throttling should not receive any request");
+ } catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) {
+ // ok as throttling set to 0
+ }
+ }
+
}
diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java
index 9f372e4b1e329..6a2ab81035b4a 100644
--- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java
+++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java
@@ -48,6 +48,7 @@ public class ClientConfiguration implements Serializable {
private boolean useTls = false;
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
+ private int concurrentLookupRequest = 5000;
/**
* @return the authentication provider to be used
@@ -309,4 +310,24 @@ public long getStatsIntervalSeconds() {
public void setStatsInterval(long statsInterval, TimeUnit unit) {
this.statsIntervalSeconds = unit.toSeconds(statsInterval);
}
+
+ /**
+ * Get configured total allowed concurrent lookup-request.
+ *
+ * @return
+ */
+ public int getConcurrentLookupRequest() {
+ return concurrentLookupRequest;
+ }
+
+ /**
+ * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on
+ * thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+ public void setConcurrentLookupRequest(int concurrentLookupRequest) {
+ this.concurrentLookupRequest = concurrentLookupRequest;
+ }
}
diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java
index d8c0a13091153..eb0918b4eeca1 100644
--- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java
@@ -60,6 +60,12 @@ public LookupException(String msg) {
}
}
+ public static class TooManyLookupRequestException extends LookupException {
+ public TooManyLookupRequestException(String msg) {
+ super(msg);
+ }
+ }
+
public static class ConnectException extends PulsarClientException {
public ConnectException(String msg) {
super(msg);
diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java
index cefa10677eeb7..0953161c9b044 100644
--- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java
@@ -19,6 +19,7 @@
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -59,6 +60,7 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap consumers = new ConcurrentLongHashMap<>(16, 1);
private final CompletableFuture connectionFuture = new CompletableFuture();
+ private final Semaphore pendingLookupRequestSemaphore;
enum State {
None, SentConnectFrame, Ready
@@ -66,6 +68,8 @@ enum State {
public ClientCnx(PulsarClientImpl pulsarClient) {
super(30, TimeUnit.SECONDS);
+ this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(),
+ true);
authentication = pulsarClient.getConfiguration().getAuthentication();
state = State.None;
}
@@ -105,7 +109,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Fail out all the pending ops
pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
- pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
+ pendingLookupRequests.forEach((key, future) -> getAndRemovePendingLookupRequest(key).completeExceptionally(e));
// Notify all attached producers/consumers so they have a chance to reconnect
producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -202,7 +206,7 @@ protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
log.info("Received Broker lookup response: {}", lookupResult.getResponse());
long requestId = lookupResult.getRequestId();
- CompletableFuture requestFuture = pendingLookupRequests.remove(requestId);
+ CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId);
if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
@@ -230,7 +234,7 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
log.info("Received Broker Partition response: {}", lookupResult.getPartitions());
long requestId = lookupResult.getRequestId();
- CompletableFuture requestFuture = pendingLookupRequests.remove(requestId);
+ CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId);
if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
@@ -251,6 +255,22 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
}
}
+ private boolean addPendingLookupRequests(long requestId, CompletableFuture future) {
+ if (pendingLookupRequestSemaphore.tryAcquire()) {
+ pendingLookupRequests.put(requestId, future);
+ return true;
+ }
+ return false;
+ }
+
+ private CompletableFuture getAndRemovePendingLookupRequest(long requestId) {
+ CompletableFuture result = pendingLookupRequests.remove(requestId);
+ if (result != null) {
+ pendingLookupRequestSemaphore.release();
+ }
+ return result;
+ }
+
@Override
protected void handleSendError(CommandSendError sendError) {
log.warn("{} Received send error from server: {}", ctx.channel(), sendError);
@@ -312,13 +332,22 @@ protected boolean isHandshakeCompleted() {
CompletableFuture newLookup(ByteBuf request, long requestId) {
CompletableFuture future = new CompletableFuture<>();
- pendingLookupRequests.put(requestId, future);
- ctx.writeAndFlush(request).addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
- future.completeExceptionally(writeFuture.cause());
+
+ if (addPendingLookupRequests(requestId, future)) {
+ ctx.writeAndFlush(request).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
+ writeFuture.cause().getMessage());
+ future.completeExceptionally(writeFuture.cause());
+ }
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Failed to add lookup-request into pending queue", requestId);
}
- });
+ future.completeExceptionally(new PulsarClientException.TooManyLookupRequestException(
+ "Failed due to too many pending lookup requests"));
+ }
return future;
}