Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -819,56 +819,43 @@ protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
});
}

protected AutoTopicCreationOverride internalGetAutoTopicCreation() {
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.autoTopicCreationOverride;
protected CompletableFuture<AutoTopicCreationOverride> internalGetAutoTopicCreationAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_TOPIC_CREATION,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.autoTopicCreationOverride);
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (autoTopicCreationOverride != null) {
ValidateResult validateResult = AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
if (!validateResult.isSuccess()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) {
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
}
}
// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoTopicCreationOverride = autoTopicCreationOverride;
return policies;
}).thenApply(r -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride != null ? autoOverride : "removed", namespaceName);
asyncResponse.resume(Response.noContent().build());
return null;
}).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName,
e.getCause());
if (e.getCause() instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
}
asyncResponse.resume(new RestException(e.getCause()));
return null;
});
}
protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
AutoTopicCreationOverride autoTopicCreationOverride) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (autoTopicCreationOverride != null) {
ValidateResult validateResult =
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
if (!validateResult.isSuccess()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (Objects.equals(autoTopicCreationOverride.getTopicType(),
TopicType.PARTITIONED.toString())){
if (maxPartitions > 0
&& autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}

protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
internalSetAutoTopicCreation(asyncResponse, null);
}
}
})
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoTopicCreationOverride = autoTopicCreationOverride;
return policies;
}));
}

protected void internalSetAutoSubscriptionCreation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,18 @@ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetAutoTopicCreation();
internalGetAutoTopicCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -572,14 +579,27 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(property, cluster, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
.thenAccept(__ -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
Expand All @@ -590,14 +610,25 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
try {
validateNamespaceName(property, cluster, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
internalSetAutoTopicCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully remove autoTopicCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
validateNamespaceName(tenant, namespace);
internalModifyDeduplicationAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
Expand All @@ -485,10 +486,17 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") String tenant,
public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetAutoTopicCreation();
internalGetAutoTopicCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -504,14 +512,27 @@ public void setAutoTopicCreation(
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Settings for automatic topic creation", required = true)
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
.thenAccept(__ -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
Expand All @@ -521,14 +542,25 @@ public void setAutoTopicCreation(
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully remove autoTopicCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId()) ->
log.info("[{}] Successfully remove autoTopicCreation on namespace {}", clientAppId(), namespaceName);

})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -1675,6 +1676,44 @@ public void testRetentionPolicyValidationAsPartOfAllPolicies() throws Exception
assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2);
}

@Test
public void testOptionsAutoTopicCreation() throws Exception {
String namespace = "auto_topic_namespace";
AutoTopicCreationOverride autoTopicCreationOverride =
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned")
.defaultNumPartitions(4).build();
try {
asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace, autoTopicCreationOverride));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster,
namespace, BundlesData.builder().build()));

// 1. set auto topic creation
asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace, autoTopicCreationOverride));

// 2. assert get auto topic creation
AutoTopicCreationOverride autoTopicCreationOverrideRsp = (AutoTopicCreationOverride) asyncRequests(
response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace));
assertEquals(autoTopicCreationOverride.getTopicType(), autoTopicCreationOverrideRsp.getTopicType());
assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(),
autoTopicCreationOverrideRsp.getDefaultNumPartitions());
assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(),
autoTopicCreationOverrideRsp.isAllowAutoTopicCreation());
// 2. remove auto topic creation and assert get null
asyncRequests(response -> namespaces.removeAutoTopicCreation(response, this.testTenant,
this.testLocalCluster, namespace));
assertNull(asyncRequests(
response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace)));
}

@Test
public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAuthorizationEnabled(false);
Expand Down