Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -30,13 +31,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -705,4 +706,27 @@ public void testTlsAuthUseTrustCert() throws Exception {
}
}

/**
* Verifies: client side throttling.
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
final String topicName = "persistent://prop/usw/my-ns/newTopic";

com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setConcurrentLookupRequest(0);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);

try {
Consumer consumer = pulsarClient.subscribe(topicName, "mysub", new ConsumerConfiguration());
fail("It should fail as throttling should not receive any request");
} catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) {
// ok as throttling set to 0
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClientConfiguration implements Serializable {
private boolean useTls = false;
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private int concurrentLookupRequest = 5000;

/**
* @return the authentication provider to be used
Expand Down Expand Up @@ -309,4 +310,24 @@ public long getStatsIntervalSeconds() {
public void setStatsInterval(long statsInterval, TimeUnit unit) {
this.statsIntervalSeconds = unit.toSeconds(statsInterval);
}

/**
* Get configured total allowed concurrent lookup-request.
*
* @return
*/
public int getConcurrentLookupRequest() {
return concurrentLookupRequest;
}

/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
public void setConcurrentLookupRequest(int concurrentLookupRequest) {
this.concurrentLookupRequest = concurrentLookupRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public LookupException(String msg) {
}
}

public static class TooManyLookupRequestException extends LookupException {
public TooManyLookupRequestException(String msg) {
super(msg);
}
}

public static class ConnectException extends PulsarClientException {
public ConnectException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down Expand Up @@ -59,13 +60,16 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final Semaphore pendingLookupRequestSemaphore;

enum State {
None, SentConnectFrame, Ready
}

public ClientCnx(PulsarClientImpl pulsarClient) {
super(30, TimeUnit.SECONDS);
this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(),
true);
authentication = pulsarClient.getConfiguration().getAuthentication();
state = State.None;
}
Expand Down Expand Up @@ -105,7 +109,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// Fail out all the pending ops
pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingLookupRequests.forEach((key, future) -> getAndRemovePendingLookupRequest(key).completeExceptionally(e));

// Notify all attached producers/consumers so they have a chance to reconnect
producers.forEach((id, producer) -> producer.connectionClosed(this));
Expand Down Expand Up @@ -202,7 +206,7 @@ protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
log.info("Received Broker lookup response: {}", lookupResult.getResponse());

long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);

if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
Expand Down Expand Up @@ -230,7 +234,7 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
log.info("Received Broker Partition response: {}", lookupResult.getPartitions());

long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);

if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
Expand All @@ -251,6 +255,22 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
}
}

private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
if (pendingLookupRequestSemaphore.tryAcquire()) {
pendingLookupRequests.put(requestId, future);
return true;
}
return false;
}

private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
CompletableFuture<LookupDataResult> result = pendingLookupRequests.remove(requestId);
if (result != null) {
pendingLookupRequestSemaphore.release();
}
return result;
}

@Override
protected void handleSendError(CommandSendError sendError) {
log.warn("{} Received send error from server: {}", ctx.channel(), sendError);
Expand Down Expand Up @@ -312,13 +332,22 @@ protected boolean isHandshakeCompleted() {

CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
pendingLookupRequests.put(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());

if (addPendingLookupRequests(requestId, future)) {
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());
}
});
} else {
if (log.isDebugEnabled()) {
log.debug("{} Failed to add lookup-request into pending queue", requestId);
}
});
future.completeExceptionally(new PulsarClientException.TooManyLookupRequestException(
"Failed due to too many pending lookup requests"));
}
return future;
}

Expand Down