diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8701618ee392a..dd882dfdadf01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -819,56 +819,43 @@ protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { }); } - protected AutoTopicCreationOverride internalGetAutoTopicCreation() { - validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ); - Policies policies = getNamespacePolicies(namespaceName); - return policies.autoTopicCreationOverride; + protected CompletableFuture internalGetAutoTopicCreationAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_TOPIC_CREATION, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.autoTopicCreationOverride); } - protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, - AutoTopicCreationOverride autoTopicCreationOverride) { - final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - if (autoTopicCreationOverride != null) { - ValidateResult validateResult = AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride); - if (!validateResult.isSuccess()) { - throw new RestException(Status.PRECONDITION_FAILED, - "Invalid configuration for autoTopicCreationOverride. the detail is " - + validateResult.getErrorInfo()); - } - if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) { - if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { - throw new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be less than or equal to " + maxPartitions); - } - } - } - // Force to read the data s.t. the watch to the cache content is setup. - namespaceResources().setPoliciesAsync(namespaceName, policies -> { - policies.autoTopicCreationOverride = autoTopicCreationOverride; - return policies; - }).thenApply(r -> { - String autoOverride = (autoTopicCreationOverride != null - && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; - log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), - autoOverride != null ? autoOverride : "removed", namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }).exceptionally(e -> { - log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, - e.getCause()); - if (e.getCause() instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); - return null; - } - asyncResponse.resume(new RestException(e.getCause())); - return null; - }); - } + protected CompletableFuture internalSetAutoTopicCreationAsync( + AutoTopicCreationOverride autoTopicCreationOverride) { + return validateNamespacePolicyOperationAsync(namespaceName, + PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); + if (autoTopicCreationOverride != null) { + ValidateResult validateResult = + AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride); + if (!validateResult.isSuccess()) { + throw new RestException(Status.PRECONDITION_FAILED, + "Invalid configuration for autoTopicCreationOverride. the detail is " + + validateResult.getErrorInfo()); + } + if (Objects.equals(autoTopicCreationOverride.getTopicType(), + TopicType.PARTITIONED.toString())){ + if (maxPartitions > 0 + && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) { + throw new RestException(Status.NOT_ACCEPTABLE, + "Number of partitions should be less than or equal to " + maxPartitions); + } - protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) { - internalSetAutoTopicCreation(asyncResponse, null); + } + } + }) + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + policies.autoTopicCreationOverride = autoTopicCreationOverride; + return policies; + })); } protected void internalSetAutoSubscriptionCreation( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 6076ba10c5236..fc207efd7a7a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -553,11 +553,18 @@ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar @ApiOperation(value = "Get autoTopicCreation info in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) - public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("property") String property, - @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetAutoTopicCreation(); + internalGetAutoTopicCreationAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -572,14 +579,27 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, AutoTopicCreationOverride autoTopicCreationOverride) { - try { - validateNamespaceName(property, cluster, namespace); - internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalSetAutoTopicCreationAsync(autoTopicCreationOverride) + .thenAccept(__ -> { + String autoOverride = (autoTopicCreationOverride != null + && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; + log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), + autoOverride, namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + ex); + if (ex instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); } @DELETE @@ -590,14 +610,25 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { - try { validateNamespaceName(property, cluster, namespace); - internalRemoveAutoTopicCreation(asyncResponse); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + internalSetAutoTopicCreationAsync(null) + .thenAccept(__ -> { + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", + clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + ex); + if (ex instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 7dcf97020fb09..8ea5c15bd9687 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -473,7 +473,8 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar validateNamespaceName(tenant, namespace); internalModifyDeduplicationAsync(null) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) - .exceptionally(ex -> { + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; @@ -485,10 +486,17 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar @ApiOperation(value = "Get autoTopicCreation info in a namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")}) - public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") String tenant, + public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetAutoTopicCreation(); + internalGetAutoTopicCreationAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -504,14 +512,27 @@ public void setAutoTopicCreation( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Settings for automatic topic creation", required = true) AutoTopicCreationOverride autoTopicCreationOverride) { - try { - validateNamespaceName(tenant, namespace); - internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalSetAutoTopicCreationAsync(autoTopicCreationOverride) + .thenAccept(__ -> { + String autoOverride = (autoTopicCreationOverride != null + && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled"; + log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), + autoOverride, namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + ex); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); } @DELETE @@ -521,14 +542,25 @@ public void setAutoTopicCreation( @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { - try { - validateNamespaceName(tenant, namespace); - internalRemoveAutoTopicCreation(asyncResponse); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalSetAutoTopicCreationAsync(null) + .thenAccept(__ -> { + log.info("[{}] Successfully remove autoTopicCreation on namespace {}", + clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(e -> { + Throwable ex = FutureUtil.unwrapCompletionException(e); + log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(), + namespaceName, + ex); + if (ex instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist")); + } else { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index bf83d1702ecd2..7394512e642d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -87,6 +87,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -1675,6 +1676,44 @@ public void testRetentionPolicyValidationAsPartOfAllPolicies() throws Exception assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2); } + @Test + public void testOptionsAutoTopicCreation() throws Exception { + String namespace = "auto_topic_namespace"; + AutoTopicCreationOverride autoTopicCreationOverride = + AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned") + .defaultNumPartitions(4).build(); + try { + asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace, autoTopicCreationOverride)); + fail("should have failed"); + } catch (RestException e) { + assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); + } + + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster, + namespace, BundlesData.builder().build())); + + // 1. set auto topic creation + asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace, autoTopicCreationOverride)); + + // 2. assert get auto topic creation + AutoTopicCreationOverride autoTopicCreationOverrideRsp = (AutoTopicCreationOverride) asyncRequests( + response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace)); + assertEquals(autoTopicCreationOverride.getTopicType(), autoTopicCreationOverrideRsp.getTopicType()); + assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(), + autoTopicCreationOverrideRsp.getDefaultNumPartitions()); + assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(), + autoTopicCreationOverrideRsp.isAllowAutoTopicCreation()); + // 2. remove auto topic creation and assert get null + asyncRequests(response -> namespaces.removeAutoTopicCreation(response, this.testTenant, + this.testLocalCluster, namespace)); + assertNull(asyncRequests( + response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster, + namespace))); + } + @Test public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarClientException { pulsar.getConfiguration().setAuthorizationEnabled(false);