diff --git a/conf/broker.conf b/conf/broker.conf index 5318fea9199e2..9f76d893dc0ff 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction maxUnackedMessagesPerConsumer=50000 +# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic +maxConcurrentLookupRequest=20000 + ### --- Authentication --- ### # Enable authentication diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index e1071572f012b..e15fed5259a4e 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -87,6 +87,8 @@ public class ServiceConfiguration implements PulsarConfiguration{ // messages to consumer once, this limit reaches until consumer starts acknowledging messages back // Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction private int maxUnackedMessagesPerConsumer = 50000; + // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic + private int maxConcurrentLookupRequest = 20000; /***** --- TLS --- ****/ // Enable TLS @@ -416,6 +418,14 @@ public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer; } + public int getMaxConcurrentLookupRequest() { + return maxConcurrentLookupRequest; + } + + public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) { + this.maxConcurrentLookupRequest = maxConcurrentLookupRequest; + } + public boolean isTlsEnabled() { return tlsEnabled; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java index 3bf223e738b2f..4708f2bb16ee7 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java @@ -15,13 +15,20 @@ */ package com.yahoo.pulsar.broker.lookup; +import static com.yahoo.pulsar.broker.service.BrokerService.LOOKUP_THROTTLING_PATH; +import static com.yahoo.pulsar.broker.service.BrokerService.THROTTLING_LOOKUP_REQUEST_KEY; +import static com.yahoo.pulsar.common.api.Commands.newLookupResponse; +import static com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl.ENCODING_SCHEME; + import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -31,26 +38,30 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.beust.jcommander.internal.Maps; +import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.web.NoSwaggerDocumentation; import com.yahoo.pulsar.broker.web.PulsarWebResource; -import com.yahoo.pulsar.common.naming.DestinationName; -import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.web.RestException; -import static com.yahoo.pulsar.common.api.Commands.newLookupResponse; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.lookup.data.LookupData; -import com.yahoo.pulsar.common.policies.data.ClusterData; +import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.util.Codec; - -import static com.google.common.base.Preconditions.checkNotNull; +import com.yahoo.pulsar.common.util.ObjectMapperFactory; import io.netty.buffer.ByteBuf; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; @Path("/v2/destination/") @NoSwaggerDocumentation @@ -65,6 +76,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path @Suspended AsyncResponse asyncResponse) { dest = Codec.decode(dest); DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest); + + if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) { + log.warn("No broker was found available for topic {}", topic); + asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); + return; + } try { validateClusterOwnership(topic.getCluster()); @@ -73,7 +90,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path } catch (Throwable t) { // Validation checks failed log.error("Validation check failed: {}", t.getMessage()); - asyncResponse.resume(t); + completeLookupResponseExceptionally(asyncResponse, t); return; } @@ -83,7 +100,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path lookupFuture.thenAccept(result -> { if (result == null) { log.warn("No broker was found available for topic {}", topic); - asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); + completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); return; } @@ -98,30 +115,53 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path topic.getLookupName(), newAuthoritative)); } catch (URISyntaxException e) { log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e); - asyncResponse.resume(e); + completeLookupResponseExceptionally(asyncResponse, e); return; } if (log.isDebugEnabled()) { log.debug("Redirect lookup for topic {} to {}", topic, redirect); } - asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build())); + completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build())); } else { // Found broker owning the topic if (log.isDebugEnabled()) { log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData()); } - asyncResponse.resume(result.getLookupData()); + completeLookupResponseSuccessfully(asyncResponse, result.getLookupData()); } }).exceptionally(exception -> { log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception); - asyncResponse.resume(exception); + completeLookupResponseExceptionally(asyncResponse, exception); return null; }); } + @PUT + @Path("permits/{permits}") + @ApiOperation(value = "Update allowed concurrent lookup permits. This operation requires Pulsar super-user privileges.") + @ApiResponses(value = { @ApiResponse(code = 204, message = "Lookup permits updated successfully"), + @ApiResponse(code = 403, message = "You don't have admin permission to create the cluster") }) + public void upsertLookupPermits(@PathParam("permits") int permits) { + validateSuperUserAccess(); + try { + Map throttlingMap = Maps.newHashMap(THROTTLING_LOOKUP_REQUEST_KEY, Integer.toString(permits)); + byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(throttlingMap); + if (localZk().exists(LOOKUP_THROTTLING_PATH, false) != null) { + localZk().setData(LOOKUP_THROTTLING_PATH, content, -1); + } else { + ZkUtils.createFullPathOptimistic(localZk(), LOOKUP_THROTTLING_PATH, content, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + log.info("[{}] Updated concurrent lookup permits {}", clientAppId(), permits); + } catch (Exception e) { + log.error("[{}] Failed to update concurrent lookup permits {}", clientAppId(), e.getMessage(), e); + throw new RestException(e); + } + } + /** * * Lookup broker-service address for a given namespace-bundle which contains given topic. @@ -225,6 +265,20 @@ public static CompletableFuture lookupDestinationAsync(PulsarService pu return lookupfuture; } + + protected ZooKeeper localZk() { + return pulsar().getZkClient(); + } + + private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) { + pulsar().getBrokerService().getLookupRequestSemaphore().release(); + asyncResponse.resume(t); + } + + private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) { + pulsar().getBrokerService().getLookupRequestSemaphore().release(); + asyncResponse.resume(lookupData); + } private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index c999790df3958..b069861254a9b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.Closeable; import java.io.IOException; @@ -27,11 +28,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -41,7 +45,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.commons.lang.SystemUtils; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -77,8 +80,10 @@ import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.policies.data.RetentionPolicies; import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; +import com.yahoo.pulsar.common.util.ObjectMapperFactory; import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener; +import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -119,6 +124,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener> dynamicConfigurationCache; + protected final AtomicReference lookupRequestSemaphore; private final ScheduledExecutorService inactivityMonitor; private final ScheduledExecutorService messageExpiryMonitor; @@ -127,6 +135,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener>(pulsar.getLocalZkCache()) { + @Override + public Map deserialize(String key, byte[] content) throws Exception { + return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class); + } + }; + this.lookupRequestSemaphore = new AtomicReference(new Semaphore(getLookupRequestPermits(), true)); + registerListenerToUpdateLookupRequest(); PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize()); } @@ -598,6 +617,10 @@ public List getDestinationMetrics() { public Map getBundleStats() { return pulsarStats.getBundleStats(); } + + public Semaphore getLookupRequestSemaphore() { + return lookupRequestSemaphore.get(); + } public void checkGC(int gcIntervalInSeconds) { topics.forEach((n, t) -> { @@ -820,6 +843,39 @@ public Map getTopicStats() { public AuthenticationService getAuthenticationService() { return authenticationService; } + + private int getLookupRequestPermits() { + int pendingLookupRequest = pulsar.getConfiguration().getMaxConcurrentLookupRequest(); + try { + Optional> data = dynamicConfigurationCache.get(LOOKUP_THROTTLING_PATH); + if (data.isPresent() && data.get().get(THROTTLING_LOOKUP_REQUEST_KEY) != null) { + pendingLookupRequest = Integer.parseInt(data.get().get(THROTTLING_LOOKUP_REQUEST_KEY)); + } + } catch (Exception e) { + log.warn("Got exception when reading ZooKeeper path [{}]:", LOOKUP_THROTTLING_PATH, e); + } + if (log.isDebugEnabled()) { + log.debug("Configured maxConcurrentLookupRequest {}", pendingLookupRequest); + } + return pendingLookupRequest; + } + + private void registerListenerToUpdateLookupRequest() { + // register listener for future update + dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener>() { + @Override + public void onUpdate(String path, Map data, Stat stat) { + if (LOOKUP_THROTTLING_PATH.equals(path)) { + if (data.get(THROTTLING_LOOKUP_REQUEST_KEY) != null) { + int pendingLookupRequest = Integer.parseInt(data.get(THROTTLING_LOOKUP_REQUEST_KEY)); + lookupRequestSemaphore.set(new Semaphore(pendingLookupRequest, true)); + } else { + log.info("{} removed {} from zk", LOOKUP_THROTTLING_PATH, THROTTLING_LOOKUP_REQUEST_KEY); + } + } + } + }); + } public List getAllTopicsFromNamespaceBundle(String namespace, String bundle) { return multiLayerTopicsMap.get(namespace).get(bundle).values(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index eac6dea32905b..7eab4f436f157 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -153,15 +153,26 @@ protected void handleLookup(CommandLookupTopic lookup) { } final long requestId = lookup.getRequestId(); final String topic = lookup.getTopic(); - lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), - getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> { - ctx.writeAndFlush(lookupResponse); - }).exceptionally(ex -> { - // it should never happen - log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); - ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); - return null; - }); + if (service.getLookupRequestSemaphore().tryAcquire()) { + lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), + getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { + if (ex == null) { + ctx.writeAndFlush(lookupResponse); + } else { + // it should never happen + log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); + ctx.writeAndFlush( + newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); + } + service.getLookupRequestSemaphore().release(); + return null; + }); + } else { + log.warn("[{}] Failed lookup due to too many lookup-requets {}", remoteAddress, topic); + ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequest, + "Failed due to too many pending lookup requests", requestId)); + } + } @Override @@ -171,24 +182,33 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa } final long requestId = partitionMetadata.getRequestId(); final String topic = partitionMetadata.getTopic(); - getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) - .thenAccept(metadata -> { - int partitions = metadata.partitions; - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); - }).exceptionally(ex -> { - if (ex instanceof PulsarClientException) { - log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic, - ex.getMessage()); - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, - ex.getMessage(), requestId)); - } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic, - ex.getMessage(), ex); - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - ex.getMessage(), requestId)); - } - return null; - }); + if (service.getLookupRequestSemaphore().tryAcquire()) { + getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) + .handle((metadata, ex) -> { + if (ex == null) { + int partitions = metadata.partitions; + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); + } else { + if (ex instanceof PulsarClientException) { + log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, + topic, ex.getMessage()); + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, + ex.getMessage(), requestId)); + } else { + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic, + ex.getMessage(), ex); + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, + ex.getMessage(), requestId)); + } + } + service.getLookupRequestSemaphore().release(); + return null; + }); + } else { + log.warn("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", remoteAddress, topic); + ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequest, + "Failed due to too many pending lookup requests", requestId)); + } } @Override diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java index f909dcc21d6f3..b3c2adcfc428c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java @@ -56,6 +56,7 @@ import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData; import com.yahoo.pulsar.broker.namespace.NamespaceService; +import com.yahoo.pulsar.broker.service.BrokerService; import com.yahoo.pulsar.client.admin.PulsarAdmin; import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -101,6 +102,7 @@ import com.yahoo.pulsar.common.policies.data.RetentionPolicies; import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.common.util.ObjectMapperFactory; +import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer; public class AdminApiTest extends MockedPulsarServiceBaseTest { @@ -379,6 +381,19 @@ public void brokers() throws Exception { assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); } + @Test + public void lookup() throws Exception { + admin.lookups().updateLookupPermits(10); + int retrievedPermit = Integer.parseInt((String) pulsar.getLocalZkCache() + .getData(BrokerService.LOOKUP_THROTTLING_PATH, new Deserializer() { + @Override + public Map deserialize(String key, byte[] content) throws Exception { + return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class); + } + }).get().get(BrokerService.THROTTLING_LOOKUP_REQUEST_KEY)); + assertEquals(retrievedPermit, 10); + } + @Test(enabled = true) public void properties() throws PulsarAdminException { Set allowedClusters = Sets.newHashSet("use"); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java index 0b10a86f5bbbf..cc980976ac67b 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java @@ -27,6 +27,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; @@ -108,6 +110,7 @@ public void setUp() throws Exception { BrokerService brokerService = mock(BrokerService.class); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(auth).when(brokerService).getAuthorizationManager(); + doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore(); } @Test @@ -134,6 +137,35 @@ public void crossColoLookup() throws Exception { WebApplicationException wae = (WebApplicationException) arg.getValue(); assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); } + + + @Test + public void testNotEnoughLookupPermits() throws Exception { + + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore(); + + DestinationLookup destLookup = spy(new DestinationLookup()); + doReturn(false).when(destLookup).isRequestHttps(); + destLookup.setPulsar(pulsar); + doReturn("null").when(destLookup).clientAppId(); + Field uriField = PulsarWebResource.class.getDeclaredField("uri"); + uriField.setAccessible(true); + UriInfo uriInfo = mock(UriInfo.class); + uriField.set(destLookup, uriInfo); + URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1"); + doReturn(uri).when(uriInfo).getRequestUri(); + doReturn(true).when(config).isAuthorizationEnabled(); + + AsyncResponse asyncResponse1 = mock(AsyncResponse.class); + destLookup.lookupDestinationAsync("myprop", "usc", "ns2", "topic1", false, asyncResponse1); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Throwable.class); + verify(asyncResponse1).resume(arg.capture()); + assertEquals(arg.getValue().getClass(), WebApplicationException.class); + WebApplicationException wae = (WebApplicationException) arg.getValue(); + assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode()); + } @Test public void testValidateReplicationSettingsOnNamespace() throws Exception { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index 30a36eeba7b4c..50ecf3962a896 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -16,27 +16,32 @@ package com.yahoo.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.client.admin.BrokerStats; import com.yahoo.pulsar.client.api.Authentication; @@ -46,13 +51,16 @@ import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.Producer; import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.api.SubscriptionType; +import com.yahoo.pulsar.client.impl.ConsumerImpl; +import com.yahoo.pulsar.client.impl.ProducerImpl; import com.yahoo.pulsar.client.impl.auth.AuthenticationTls; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats; import com.yahoo.pulsar.common.policies.data.PersistentTopicStats; - +import com.yahoo.pulsar.common.util.ObjectMapperFactory; /** */ public class BrokerServiceTest extends BrokerTestBase { @@ -705,4 +713,233 @@ public void testTlsAuthUseTrustCert() throws Exception { } } + /** + * Verifies: admin-rest api creates throttling zk-node if it's not already present. + * + * @throws Exception + */ + @Test + public void testThrottlingLookupPermitZKCreate() throws Exception { + final int newPermit = 50; + admin.lookups().updateLookupPermits(newPermit); + Thread.sleep(500); + byte[] content = mockZookKeeper.getData(BrokerService.LOOKUP_THROTTLING_PATH, null, null); + int zkPermit = Integer.parseInt((String) ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class) + .get(BrokerService.THROTTLING_LOOKUP_REQUEST_KEY)); + assertEquals(newPermit, zkPermit); + } + + /** + * Verifies: updating zk-thottling node reflects broker-maxConcurrentLookupRequest and updates semaphore. + * + * @throws Exception + */ + @Test + public void testThrottlingLookupRequestSemaphore() throws Exception { + BrokerService service = pulsar.getBrokerService(); + // create a znode for permits + admin.lookups().updateLookupPermits(10); + Thread.sleep(500); + // get zk node in cache and listner is already registered, so next + // zk-update will update permits value in brokerService + Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("getLookupRequestPermits"); + getPermitZkNodeMethod.setAccessible(true); + getPermitZkNodeMethod.invoke(service); + // update zknode with permit value + admin.lookups().updateLookupPermits(0); + Thread.sleep(500); + assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0); + } + + /** + * Broker has maxConcurrentLookupRequest = 0 so, it rejects incoming lookup request and it cause consumer creation + * failure. + * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBroker0Permit() throws Exception { + + BrokerService service = pulsar.getBrokerService(); + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + Consumer consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); + consumer.close(); + + // update throttling-permit into zk + // create a znode for permits + admin.lookups().updateLookupPermits(10); + Thread.sleep(500); + // get zk node in cache and listner is already registered, so next + // zk-update will update permits value in brokerService + Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("getLookupRequestPermits"); + getPermitZkNodeMethod.setAccessible(true); + getPermitZkNodeMethod.invoke(service); + // update zknode with permit value + admin.lookups().updateLookupPermits(0); + + try { + consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); + consumer.close(); + fail("It should fail as throttling should not receive any request"); + } catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) { + // ok as throttling set to 0 + } + } + + /** + * Verifies: Broker side throttling: + * 1. concurrent_consumer_creation > maxConcurrentLookupRequest at broker + * 2. few of the consumer creation must fail with TooManyLookupRequestException. + * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBroker() throws Exception { + + BrokerService service = pulsar.getBrokerService(); + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setIoThreads(20); + clientConf.setConnectionsPerBroker(20); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + consumerConfig.setSubscriptionType(SubscriptionType.Shared); + + // create a throttling-znode for permits + admin.lookups().updateLookupPermits(10); + Thread.sleep(500); + // get zk node in cache and listner is already registered, so next + // zk-update will update permits value in brokerService + Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("getLookupRequestPermits"); + getPermitZkNodeMethod.setAccessible(true); + getPermitZkNodeMethod.invoke(service); + // update zknode with permit value: 1 (only 1 concurrent request allows) + admin.lookups().updateLookupPermits(1); + Thread.sleep(500); + + List successfulConsumers = Lists.newArrayList(); + ExecutorService executor = Executors.newFixedThreadPool(10); + final int totalConsumers = 20; + CountDownLatch latch = new CountDownLatch(totalConsumers); + for (int i = 0; i < totalConsumers; i++) { + executor.execute(() -> { + try { + successfulConsumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); + } catch (PulsarClientException.TooManyLookupRequestException e) { + // ok + } catch (Exception e) { + fail("it shouldn't failed"); + } + latch.countDown(); + }); + } + latch.await(); + + for (int i = 0; i < successfulConsumers.size(); i++) { + successfulConsumers.get(i).close(); + } + pulsarClient.close(); + assertNotEquals(successfulConsumers.size(), totalConsumers); + } + + /** + * This testcase make sure that once consumer lost connection with broker, it always reconnects with broker by retrying on + * throttling-error exception also. + * 1. all consumers get connected + * 2. broker restarts with maxConcurrentLookupRequest = 1 + * 3. consumers reconnect and some get TooManyRequestException and again retries + * 4. eventually all consumers will successfully connect to broker + * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception { + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setIoThreads(20); + clientConf.setConnectionsPerBroker(20); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + // update permit to allow all consumers to get connect first time + admin.lookups().updateLookupPermits(100); + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + consumerConfig.setSubscriptionType(SubscriptionType.Shared); + List consumers = Lists.newArrayList(); + ExecutorService executor = Executors.newFixedThreadPool(10); + final int totalConsumers = 8; + CountDownLatch latch = new CountDownLatch(totalConsumers); + for (int i = 0; i < totalConsumers; i++) { + executor.execute(() -> { + try { + consumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); + } catch (PulsarClientException.TooManyLookupRequestException e) { + // ok + } catch (Exception e) { + fail("it shouldn't failed"); + } + latch.countDown(); + }); + } + latch.await(); + + stopBroker(); + int actualValue = conf.getMaxConcurrentLookupRequest(); + conf.setMaxConcurrentLookupRequest(1); + startBroker(); + // wait for consumer to reconnect + Thread.sleep(3000); + + int totalConnectedConsumers = 0; + for (int i = 0; i < consumers.size(); i++) { + if (((ConsumerImpl) consumers.get(i)).isConnected()) { + totalConnectedConsumers++; + } + consumers.get(i).close(); + + } + assertEquals(totalConnectedConsumers, totalConsumers); + + pulsarClient.close(); + conf.setMaxConcurrentLookupRequest(actualValue); + } + + /** + * Verifies: client side throttling. + * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByClient() throws Exception { + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setConcurrentLookupRequest(0); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + try { + Consumer consumer = pulsarClient.subscribe(topicName, "mysub", new ConsumerConfiguration()); + fail("It should fail as throttling should not receive any request"); + } catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) { + // ok as throttling set to 0 + } + } + } diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Lookup.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Lookup.java index 5eae812f9b1d3..960a0a8b25d9d 100644 --- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Lookup.java +++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Lookup.java @@ -28,4 +28,12 @@ public interface Lookup { * @return the broker URL that serves the destination */ public String lookupDestination(String destination) throws PulsarAdminException; + + /** + * Update number of lookup permits allowed simultaneously for throttling + * + * @param permits + * @throws PulsarAdminException + */ + public void updateLookupPermits(int permits) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/LookupImpl.java index 87730f54c1c97..cf0d60ff33a8e 100644 --- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/LookupImpl.java +++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/LookupImpl.java @@ -16,10 +16,11 @@ package com.yahoo.pulsar.client.admin.internal; import javax.ws.rs.ClientErrorException; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; -import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.admin.Lookup; +import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.api.Authentication; import com.yahoo.pulsar.common.lookup.data.LookupData; import com.yahoo.pulsar.common.naming.DestinationName; @@ -53,6 +54,15 @@ public String lookupDestination(String destination) throws PulsarAdminException } } + @Override + public void updateLookupPermits(int permits) throws PulsarAdminException { + try { + request(v2lookup.path("/destination/permits/").path(Integer.toString(permits))).put(Entity.json("")); + } catch (Exception e) { + throw getLookupApiException(e); + } + } + private String doDestinationLookup(WebTarget lookupResource, DestinationName destName) throws PulsarAdminException { LookupData lookupData = request(lookupResource.path(destName.getLookupName())).get(LookupData.class); if (useTls) { diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java index 82890d7ce3389..3f10dfcb6c6a9 100644 --- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java @@ -18,6 +18,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.yahoo.pulsar.client.admin.PulsarAdmin; +import com.yahoo.pulsar.client.admin.PulsarAdminException; @Parameters(commandDescription = "Operations about brokers") public class CmdBrokers extends CmdBase { @@ -48,9 +49,21 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Update number of lookup permits allowed simultaneously for throttling ") + private class UpdateLookupPermitsCmd extends CliCommand { + @Parameter(description = "number of lookup permits", required = true) + private int permits; + + @Override + void run() throws PulsarAdminException { + admin.lookups().updateLookupPermits(permits); + } + } + CmdBrokers(PulsarAdmin admin) { super("brokers", admin); jcommander.addCommand("list", new List()); jcommander.addCommand("namespaces", new Namespaces()); + jcommander.addCommand("updateLookupPermits", new UpdateLookupPermitsCmd()); } } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java index 9f372e4b1e329..7372875806b98 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java @@ -48,6 +48,7 @@ public class ClientConfiguration implements Serializable { private boolean useTls = false; private String tlsTrustCertsFilePath = ""; private boolean tlsAllowInsecureConnection = false; + private int concurrentLookupRequest = 5000; /** * @return the authentication provider to be used @@ -309,4 +310,22 @@ public long getStatsIntervalSeconds() { public void setStatsInterval(long statsInterval, TimeUnit unit) { this.statsIntervalSeconds = unit.toSeconds(statsInterval); } + + /** + * Get configured total allowed concurrent lookup-request. + * + * @return + */ + public int getConcurrentLookupRequest() { + return concurrentLookupRequest; + } + + /** + * Configure total allowed concurrent lookup-request to prevent overload on broker. + * + * @param concurrentLookupRequest + */ + public void setConcurrentLookupRequest(int concurrentLookupRequest) { + this.concurrentLookupRequest = concurrentLookupRequest; + } } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java index d8c0a13091153..eb0918b4eeca1 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java @@ -60,6 +60,12 @@ public LookupException(String msg) { } } + public static class TooManyLookupRequestException extends LookupException { + public TooManyLookupRequestException(String msg) { + super(msg); + } + } + public static class ConnectException extends PulsarClientException { public ConnectException(String msg) { super(msg); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java index cefa10677eeb7..deff3f008413e 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java @@ -19,6 +19,7 @@ import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -59,6 +60,7 @@ public class ClientCnx extends PulsarHandler { private final ConcurrentLongHashMap consumers = new ConcurrentLongHashMap<>(16, 1); private final CompletableFuture connectionFuture = new CompletableFuture(); + private final Semaphore pendingLookupRequestSemaphore; enum State { None, SentConnectFrame, Ready @@ -66,6 +68,8 @@ enum State { public ClientCnx(PulsarClientImpl pulsarClient) { super(30, TimeUnit.SECONDS); + this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(), + true); authentication = pulsarClient.getConfiguration().getAuthentication(); state = State.None; } @@ -105,7 +109,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Fail out all the pending ops pendingRequests.forEach((key, future) -> future.completeExceptionally(e)); - pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e)); + pendingLookupRequests.forEach((key, future) -> getAndRemovePendingLookupRequest(key).completeExceptionally(e)); // Notify all attached producers/consumers so they have a chance to reconnect producers.forEach((id, producer) -> producer.connectionClosed(this)); @@ -202,7 +206,7 @@ protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) { log.info("Received Broker lookup response: {}", lookupResult.getResponse()); long requestId = lookupResult.getRequestId(); - CompletableFuture requestFuture = pendingLookupRequests.remove(requestId); + CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId); if (requestFuture != null) { // Complete future with exception if : Result.response=fail/null @@ -230,7 +234,7 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l log.info("Received Broker Partition response: {}", lookupResult.getPartitions()); long requestId = lookupResult.getRequestId(); - CompletableFuture requestFuture = pendingLookupRequests.remove(requestId); + CompletableFuture requestFuture = getAndRemovePendingLookupRequest(requestId); if (requestFuture != null) { // Complete future with exception if : Result.response=fail/null @@ -251,6 +255,22 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l } } + private boolean addPendingLookupRequests(long requestId, CompletableFuture future) { + if (pendingLookupRequestSemaphore.tryAcquire()) { + pendingLookupRequests.put(requestId, future); + return true; + } + return false; + } + + private CompletableFuture getAndRemovePendingLookupRequest(long requestId) { + CompletableFuture result = pendingLookupRequests.remove(requestId); + if (result != null) { + pendingLookupRequestSemaphore.release(); + } + return result; + } + @Override protected void handleSendError(CommandSendError sendError) { log.warn("{} Received send error from server: {}", ctx.channel(), sendError); @@ -312,13 +332,20 @@ protected boolean isHandshakeCompleted() { CompletableFuture newLookup(ByteBuf request, long requestId) { CompletableFuture future = new CompletableFuture<>(); - pendingLookupRequests.put(requestId, future); - ctx.writeAndFlush(request).addListener(writeFuture -> { - if (!writeFuture.isSuccess()) { - log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage()); - future.completeExceptionally(writeFuture.cause()); - } - }); + + if (addPendingLookupRequests(requestId, future)) { + ctx.writeAndFlush(request).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId, + writeFuture.cause().getMessage()); + future.completeExceptionally(writeFuture.cause()); + } + }); + } else { + log.warn("{} Failed to add lookup-request into pending queue", requestId); + future.completeExceptionally(new PulsarClientException.TooManyLookupRequestException( + "Failed due to too many pending lookup requests")); + } return future; } @@ -384,6 +411,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String return new PulsarClientException.BrokerPersistenceException(errorMsg); case ServiceNotReady: return new PulsarClientException.LookupException(errorMsg); + case TooManyRequest: + return new PulsarClientException.TooManyLookupRequestException(errorMsg); case ProducerBlockedQuotaExceededError: return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg); case ProducerBlockedQuotaExceededException: diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index 0e6147836df08..fae84ccf94dd5 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -64,6 +64,7 @@ public enum ServerError ProducerBlockedQuotaExceededError(7, 7), ProducerBlockedQuotaExceededException(8, 8), ChecksumError(9, 9), + TooManyRequest(10, 10), ; public static final int UnknownError_VALUE = 0; @@ -76,6 +77,7 @@ public enum ServerError public static final int ProducerBlockedQuotaExceededError_VALUE = 7; public static final int ProducerBlockedQuotaExceededException_VALUE = 8; public static final int ChecksumError_VALUE = 9; + public static final int TooManyRequest_VALUE = 10; public final int getNumber() { return value; } @@ -92,6 +94,7 @@ public static ServerError valueOf(int value) { case 7: return ProducerBlockedQuotaExceededError; case 8: return ProducerBlockedQuotaExceededException; case 9: return ChecksumError; + case 10: return TooManyRequest; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 6118b7ae01501..9061a1c73c625 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -77,6 +77,7 @@ enum ServerError { ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded ChecksumError = 9; // Error while verifying message checksum + TooManyRequest = 10; // Error with too many simultaneously request } enum AuthMethod {