Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=20000

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ServiceConfiguration implements PulsarConfiguration{
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
private int maxConcurrentLookupRequest = 20000;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -416,6 +418,14 @@ public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer)
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}

public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}

public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) {
this.maxConcurrentLookupRequest = maxConcurrentLookupRequest;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@
*/
package com.yahoo.pulsar.broker.lookup;

import static com.yahoo.pulsar.broker.service.BrokerService.LOOKUP_THROTTLING_PATH;
import static com.yahoo.pulsar.broker.service.BrokerService.THROTTLING_LOOKUP_REQUEST_KEY;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import static com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl.ENCODING_SCHEME;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand All @@ -31,26 +38,30 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.internal.Maps;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.web.NoSwaggerDocumentation;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.web.RestException;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.util.Codec;

import static com.google.common.base.Preconditions.checkNotNull;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

import io.netty.buffer.ByteBuf;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

@Path("/v2/destination/")
@NoSwaggerDocumentation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not throttle http lookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added http-throttling change as well.

Expand All @@ -65,6 +76,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
@Suspended AsyncResponse asyncResponse) {
dest = Codec.decode(dest);
DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest);

if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

try {
validateClusterOwnership(topic.getCluster());
Expand All @@ -73,7 +90,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
} catch (Throwable t) {
// Validation checks failed
log.error("Validation check failed: {}", t.getMessage());
asyncResponse.resume(t);
completeLookupResponseExceptionally(asyncResponse, t);
return;
}

Expand All @@ -83,7 +100,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
lookupFuture.thenAccept(result -> {
if (result == null) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

Expand All @@ -98,30 +115,53 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}
asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData());
}
asyncResponse.resume(result.getLookupData());
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception);
asyncResponse.resume(exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});

}


@PUT
@Path("permits/{permits}")
@ApiOperation(value = "Update allowed concurrent lookup permits. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Lookup permits updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to create the cluster") })
public void upsertLookupPermits(@PathParam("permits") int permits) {
validateSuperUserAccess();
try {
Map<String, String> throttlingMap = Maps.newHashMap(THROTTLING_LOOKUP_REQUEST_KEY, Integer.toString(permits));
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(throttlingMap);
if (localZk().exists(LOOKUP_THROTTLING_PATH, false) != null) {
localZk().setData(LOOKUP_THROTTLING_PATH, content, -1);
} else {
ZkUtils.createFullPathOptimistic(localZk(), LOOKUP_THROTTLING_PATH, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
log.info("[{}] Updated concurrent lookup permits {}", clientAppId(), permits);
} catch (Exception e) {
log.error("[{}] Failed to update concurrent lookup permits {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}

/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
Expand Down Expand Up @@ -225,6 +265,20 @@ public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pu

return lookupfuture;
}

protected ZooKeeper localZk() {
return pulsar().getZkClient();
}

private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(t);
}

private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(lookupData);
}

private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -27,11 +28,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand All @@ -41,7 +45,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.commons.lang.SystemUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,8 +80,10 @@
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -119,6 +124,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
private final ScheduledExecutorService backlogQuotaChecker;

private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand All @@ -127,6 +135,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

private final static String producerNameGeneratorPath = "/counters/producer-name";

public static final String LOOKUP_THROTTLING_PATH = "/loadbalance/settings/throttling";
public static final String THROTTLING_LOOKUP_REQUEST_KEY = "maxConcurrentLookupRequest";

private final BacklogQuotaManager backlogQuotaManager;

private final int keepAliveIntervalSeconds;
Expand Down Expand Up @@ -186,6 +197,14 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar.getLocalZkCache()) {
@Override
public Map<String, String> deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(getLookupRequestPermits(), true));
registerListenerToUpdateLookupRequest();

PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
Expand Down Expand Up @@ -598,6 +617,10 @@ public List<Metrics> getDestinationMetrics() {
public Map<String, NamespaceBundleStats> getBundleStats() {
return pulsarStats.getBundleStats();
}

public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
Expand Down Expand Up @@ -820,6 +843,39 @@ public Map<String, PersistentTopicStats> getTopicStats() {
public AuthenticationService getAuthenticationService() {
return authenticationService;
}

private int getLookupRequestPermits() {
int pendingLookupRequest = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
try {
Optional<Map<String, String>> data = dynamicConfigurationCache.get(LOOKUP_THROTTLING_PATH);
if (data.isPresent() && data.get().get(THROTTLING_LOOKUP_REQUEST_KEY) != null) {
pendingLookupRequest = Integer.parseInt(data.get().get(THROTTLING_LOOKUP_REQUEST_KEY));
}
} catch (Exception e) {
log.warn("Got exception when reading ZooKeeper path [{}]:", LOOKUP_THROTTLING_PATH, e);
}
if (log.isDebugEnabled()) {
log.debug("Configured maxConcurrentLookupRequest {}", pendingLookupRequest);
}
return pendingLookupRequest;
}

private void registerListenerToUpdateLookupRequest() {
// register listener for future update
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@Override
public void onUpdate(String path, Map<String, String> data, Stat stat) {
if (LOOKUP_THROTTLING_PATH.equals(path)) {
if (data.get(THROTTLING_LOOKUP_REQUEST_KEY) != null) {
int pendingLookupRequest = Integer.parseInt(data.get(THROTTLING_LOOKUP_REQUEST_KEY));
lookupRequestSemaphore.set(new Semaphore(pendingLookupRequest, true));
} else {
log.info("{} removed {} from zk", LOOKUP_THROTTLING_PATH, THROTTLING_LOOKUP_REQUEST_KEY);
}
}
}
});
}

public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
Expand Down
Loading