From e3b28743a62d0310964d6a9a67e7f2549d67ff16 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 16 May 2022 22:16:42 +0800 Subject: [PATCH 1/6] Make some operation auto topic creation in Namespaces async. --- .../broker/admin/impl/NamespacesBase.java | 78 +++++++------------ .../pulsar/broker/admin/v1/Namespaces.java | 64 ++++++++++----- .../pulsar/broker/admin/v2/Namespaces.java | 62 ++++++++++----- .../pulsar/broker/admin/NamespacesTest.java | 41 ++++++++++ 4 files changed, 160 insertions(+), 85 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8701618ee392a..bcb450f2067c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -819,56 +819,38 @@ protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { }); } - protected AutoTopicCreationOverride internalGetAutoTopicCreation() { - validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ); - Policies policies = getNamespacePolicies(namespaceName); - return policies.autoTopicCreationOverride; - } - - protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, - AutoTopicCreationOverride autoTopicCreationOverride) { - final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - if (autoTopicCreationOverride != null) { - ValidateResult validateResult = AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride); - if (!validateResult.isSuccess()) { - throw new RestException(Status.PRECONDITION_FAILED, - "Invalid configuration for autoTopicCreationOverride. the detail is " - + validateResult.getErrorInfo()); - } - if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) { - if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { - throw new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be less than or equal to " + maxPartitions); - } - } - } - // Force to read the data s.t. the watch to the cache content is setup. - namespaceResources().setPoliciesAsync(namespaceName, policies -> { - policies.autoTopicCreationOverride = autoTopicCreationOverride; - return policies; - }).thenApply(r -> { - String autoOverride = (autoTopicCreationOverride != null - && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; - log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), - autoOverride != null ? autoOverride : "removed", namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }).exceptionally(e -> { - log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (e.getCause() instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); - return null; - } - asyncResponse.resume(new RestException(e.getCause())); - return null; - }); + protected CompletableFuture internalGetAutoTopicCreationAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_TOPIC_CREATION, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.autoTopicCreationOverride); } - protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) { - internalSetAutoTopicCreation(asyncResponse, null); + protected CompletableFuture internalSetAutoTopicCreationAsync( + AutoTopicCreationOverride autoTopicCreationOverride) { + return validateNamespacePolicyOperationAsync(namespaceName, + PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); + if (autoTopicCreationOverride != null) { + ValidateResult validateResult = + AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride); + if (!validateResult.isSuccess()) { + throw new RestException(Status.PRECONDITION_FAILED, + "Invalid configuration for autoTopicCreationOverride. the detail is " + + validateResult.getErrorInfo()); + } + if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { + throw new RestException(Status.NOT_ACCEPTABLE, + "Number of partitions should be less than or equal to " + maxPartitions); + } + } + }) + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + policies.autoTopicCreationOverride = autoTopicCreationOverride; + return policies; + })); } protected void internalSetAutoSubscriptionCreation( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 6076ba10c5236..1b5cb42d05555 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -553,11 +553,18 @@ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar @ApiOperation(value = "Get autoTopicCreation info in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) - public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetAutoTopicCreation(); + internalGetAutoTopicCreationAsync() + .thenAccept(autoTopicCreationOverride -> asyncResponse.resume(autoTopicCreationOverride)) + .exceptionally(ex -> { + log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -572,14 +579,25 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, AutoTopicCreationOverride autoTopicCreationOverride) { - try { - validateNamespaceName(property, cluster, namespace); - internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalSetAutoTopicCreationAsync(autoTopicCreationOverride) + .thenAccept(__ -> { + String autoOverride = (autoTopicCreationOverride != null + && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; + log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), + autoOverride, namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + e.getCause()); + if (e.getCause() instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); } @DELETE @@ -590,14 +608,22 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { - try { validateNamespaceName(property, cluster, namespace); - internalRemoveAutoTopicCreation(asyncResponse); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + internalSetAutoTopicCreationAsync(null) + .thenAccept(__ -> { + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId()); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + e.getCause()); + if (e.getCause() instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 7dcf97020fb09..74334c1ca25a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -485,10 +485,17 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar @ApiOperation(value = "Get autoTopicCreation info in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")}) - public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") String tenant, + public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetAutoTopicCreation(); + internalGetAutoTopicCreationAsync() + .thenAccept(autoTopicCreationOverride -> asyncResponse.resume(autoTopicCreationOverride)) + .exceptionally(ex -> { + log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -504,14 +511,25 @@ public void setAutoTopicCreation( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Settings for automatic topic creation", required = true) AutoTopicCreationOverride autoTopicCreationOverride) { - try { - validateNamespaceName(tenant, namespace); - internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalSetAutoTopicCreationAsync(autoTopicCreationOverride) + .thenAccept(__ -> { + String autoOverride = (autoTopicCreationOverride != null + && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; + log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), + autoOverride, namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + e.getCause()); + if (e.getCause() instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); } @DELETE @@ -521,14 +539,22 @@ public void setAutoTopicCreation( @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { - try { - validateNamespaceName(tenant, namespace); - internalRemoveAutoTopicCreation(asyncResponse); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalSetAutoTopicCreationAsync(null) + .thenAccept(__ -> { + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId()); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + e.getCause()); + if (e.getCause() instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index bf83d1702ecd2..5d6b7251397cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -87,6 +87,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -98,6 +99,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.zookeeper.KeeperException.Code; @@ -108,6 +110,7 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -1675,6 +1678,44 @@ public void testRetentionPolicyValidationAsPartOfAllPolicies() throws Exception assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2); } + @Test + public void testOptionsAutoTopicCreation() throws Exception { + String namespace = "auto_topic_namespace"; + AutoTopicCreationOverride autoTopicCreationOverride = + AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned") + .defaultNumPartitions(4).build(); + try { + asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace, autoTopicCreationOverride)); + fail("should have failed"); + } catch (RestException e) { + assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); + } + + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster, + namespace, BundlesData.builder().build())); + + // 1. set auto topic creation + asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace, autoTopicCreationOverride)); + + // 2. assert get auto topic creation + AutoTopicCreationOverride autoTopicCreationOverrideRsp = (AutoTopicCreationOverride) asyncRequests( + response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace)); + assertEquals(autoTopicCreationOverride.getTopicType(), autoTopicCreationOverrideRsp.getTopicType()); + assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(), + autoTopicCreationOverrideRsp.getDefaultNumPartitions()); + assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(), + autoTopicCreationOverrideRsp.isAllowAutoTopicCreation()); + // 2. remove auto topic creation and assert get null + asyncRequests(response -> namespaces.removeAutoTopicCreation(response, this.testTenant, + this.testLocalCluster, namespace)); + assertNull(asyncRequests( + response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace))); + } + @Test public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarClientException { pulsar.getConfiguration().setAuthorizationEnabled(false); From 08947288acea2cd38e572735d28f6a8ce605aed4 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 16 May 2022 22:20:33 +0800 Subject: [PATCH 2/6] Remove unused import package --- .../java/org/apache/pulsar/broker/admin/NamespacesTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 5d6b7251397cd..7394512e642d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -99,7 +99,6 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.zookeeper.KeeperException.Code; @@ -110,7 +109,6 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; From 572fc3ca12d9f1e4927ac2338691fbd55428a3ed Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 18 May 2022 11:28:51 +0800 Subject: [PATCH 3/6] fix some code --- .../apache/pulsar/broker/admin/v1/Namespaces.java | 15 +++++++++------ .../apache/pulsar/broker/admin/v2/Namespaces.java | 15 +++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 1b5cb42d05555..b4f0c07cb5664 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -559,7 +559,7 @@ public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); internalGetAutoTopicCreationAsync() - .thenAccept(autoTopicCreationOverride -> asyncResponse.resume(autoTopicCreationOverride)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -592,10 +592,11 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e.getCause()); - if (e.getCause() instanceof NotFoundException) { + if (FutureUtil.unwrapCompletionException(e) instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, e); } - resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); } @@ -611,17 +612,19 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse validateNamespaceName(property, cluster, namespace); internalSetAutoTopicCreationAsync(null) .thenAccept(__ -> { - log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId()); + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", + clientAppId(), namespaceName); asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e.getCause()); - if (e.getCause() instanceof NotFoundException) { + if (FutureUtil.unwrapCompletionException(e) instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, e); } - resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 74334c1ca25a0..e23845ec61ccf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -490,7 +490,7 @@ public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); internalGetAutoTopicCreationAsync() - .thenAccept(autoTopicCreationOverride -> asyncResponse.resume(autoTopicCreationOverride)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -524,10 +524,11 @@ public void setAutoTopicCreation( log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e.getCause()); - if (e.getCause() instanceof MetadataStoreException.NotFoundException) { + if (FutureUtil.unwrapCompletionException(e) instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, e); } - resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); } @@ -542,17 +543,19 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse validateNamespaceName(tenant, namespace); internalSetAutoTopicCreationAsync(null) .thenAccept(__ -> { - log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId()); + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", + clientAppId(), namespaceName); asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e.getCause()); - if (e.getCause() instanceof MetadataStoreException.NotFoundException) { + if (FutureUtil.unwrapCompletionException(e) instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, e); } - resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); } From 5efb13af43cff790a28b1994e0ca94fe52640698 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 23 May 2022 15:40:15 +0800 Subject: [PATCH 4/6] code format --- .../java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bcb450f2067c2..93136231ac37f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -97,7 +97,6 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicHashPositions; -import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.ValidateResult; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; From e698a3b2c113fbb3718f324d6a020a70c6bb04d2 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 24 May 2022 16:18:17 +0800 Subject: [PATCH 5/6] Fix exception handler and rebase #15653 --- .../pulsar/broker/admin/impl/NamespacesBase.java | 12 +++++++++--- .../apache/pulsar/broker/admin/v1/Namespaces.java | 10 ++++++---- .../apache/pulsar/broker/admin/v2/Namespaces.java | 13 ++++++++----- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 93136231ac37f..dd882dfdadf01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -97,6 +97,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicHashPositions; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.ValidateResult; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; @@ -840,9 +841,14 @@ protected CompletableFuture internalSetAutoTopicCreationAsync( "Invalid configuration for autoTopicCreationOverride. the detail is " + validateResult.getErrorInfo()); } - if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { - throw new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be less than or equal to " + maxPartitions); + if (Objects.equals(autoTopicCreationOverride.getTopicType(), + TopicType.PARTITIONED.toString())){ + if (maxPartitions > 0 + && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { + throw new RestException(Status.NOT_ACCEPTABLE, + "Number of partitions should be less than or equal to " + maxPartitions); + } + } } }) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index b4f0c07cb5664..6489a0a4e9ad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -589,10 +589,11 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (FutureUtil.unwrapCompletionException(e) instanceof NotFoundException) { + ex); + if (ex instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); } else { resumeAsyncResponseExceptionally(asyncResponse, e); @@ -617,10 +618,11 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (FutureUtil.unwrapCompletionException(e) instanceof NotFoundException) { + ex); + if (ex instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); } else { resumeAsyncResponseExceptionally(asyncResponse, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index e23845ec61ccf..fb5266b76ff83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -473,7 +473,8 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar validateNamespaceName(tenant, namespace); internalModifyDeduplicationAsync(null) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; @@ -521,10 +522,11 @@ public void setAutoTopicCreation( asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (FutureUtil.unwrapCompletionException(e) instanceof MetadataStoreException.NotFoundException) { + ex); + if (ex instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); } else { resumeAsyncResponseExceptionally(asyncResponse, e); @@ -548,10 +550,11 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse asyncResponse.resume(Response.noContent().build()); }) .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (FutureUtil.unwrapCompletionException(e) instanceof MetadataStoreException.NotFoundException) { + ex); + if (ex instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); } else { resumeAsyncResponseExceptionally(asyncResponse, e); From f7e95588e431a56904394bd6cc74d89c932159cc Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 24 May 2022 16:27:11 +0800 Subject: [PATCH 6/6] Unified use ex --- .../java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 4 ++-- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 6489a0a4e9ad4..fc207efd7a7a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -596,7 +596,7 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, if (ex instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); } else { - resumeAsyncResponseExceptionally(asyncResponse, e); + resumeAsyncResponseExceptionally(asyncResponse, ex); } return null; }); @@ -625,7 +625,7 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse if (ex instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); } else { - resumeAsyncResponseExceptionally(asyncResponse, e); + resumeAsyncResponseExceptionally(asyncResponse, ex); } return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index fb5266b76ff83..8ea5c15bd9687 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -529,7 +529,7 @@ public void setAutoTopicCreation( if (ex instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); } else { - resumeAsyncResponseExceptionally(asyncResponse, e); + resumeAsyncResponseExceptionally(asyncResponse, ex); } return null; }); @@ -557,7 +557,7 @@ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse if (ex instanceof MetadataStoreException.NotFoundException) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); } else { - resumeAsyncResponseExceptionally(asyncResponse, e); + resumeAsyncResponseExceptionally(asyncResponse, ex); } return null; });