Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2387,10 +2387,15 @@ protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrateg
"schemaCompatibilityStrategy");
}

protected boolean internalGetSchemaValidationEnforced() {
protected boolean internalGetSchemaValidationEnforced(boolean applied) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_validation_enforced;
boolean schemaValidationEnforced = getNamespacePolicies(namespaceName).schema_validation_enforced;
if (!schemaValidationEnforced && applied) {
return pulsar().getConfiguration().isSchemaValidationEnforced();
} else {
return schemaValidationEnforced;
}
}

protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,9 +1626,10 @@ public void setSubscriptionTypesEnabled(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") })
public boolean getSchemaValidtionEnforced(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("applied") @DefaultValue("false") boolean applied) {
validateNamespaceName(tenant, namespace);
return internalGetSchemaValidationEnforced();
return internalGetSchemaValidationEnforced(applied);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testGetSchemaValidationEnforcedApplied() throws Exception {
String namespace = "schema-validation-enforced/testApplied";
admin.namespaces().createNamespace(namespace);
this.conf.setSchemaValidationEnforced(true);
assertTrue(admin.namespaces().getSchemaValidationEnforced(namespace, true));
assertFalse(admin.namespaces().getSchemaValidationEnforced(namespace, false));
}

@Test
public void testDisableSchemaValidationEnforcedNoSchema() throws Exception {
admin.namespaces().createNamespace("schema-validation-enforced/default-no-schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,7 @@ void setSchemaAutoUpdateCompatibilityStrategy(String namespace,

/**
* Get schema validation enforced for namespace.
* @param namespace namespace for this command.
* @return the schema validation enforced flag
* @throws NotAuthorizedException
* Don't have admin permission
Expand All @@ -3547,16 +3548,39 @@ void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
* @throws PulsarAdminException
* Unexpected error
*/
boolean getSchemaValidationEnforced(String namespace)
throws PulsarAdminException;
boolean getSchemaValidationEnforced(String namespace) throws PulsarAdminException;

/**
* Get schema validation enforced for namespace asynchronously.
* @param namespace namespace for this command.
*
* @return the schema validation enforced flag
*/
CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace);

/**
* Get schema validation enforced for namespace.
* @param namespace namespace for this command.
* @param applied applied for this command.
* @return the schema validation enforced flag
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Tenant or Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
boolean getSchemaValidationEnforced(String namespace, boolean applied) throws PulsarAdminException;

/**
* Get schema validation enforced for namespace asynchronously.
* @param namespace namespace for this command.
* @param applied applied for this command.
*
* @return the schema validation enforced flag
*/
CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace, boolean applied);

/**
* Set schema validation enforced for namespace.
* if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3060,11 +3060,21 @@ public void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
}

@Override
public boolean getSchemaValidationEnforced(String namespace)
public boolean getSchemaValidationEnforced(String namespace) throws PulsarAdminException {
return getSchemaValidationEnforced(namespace, false);
}

@Override
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
return getSchemaValidationEnforcedAsync(namespace, false);
}

@Override
public boolean getSchemaValidationEnforced(String namespace, boolean applied)
throws PulsarAdminException {
try {
return getSchemaValidationEnforcedAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getSchemaValidationEnforcedAsync(namespace, applied)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -3076,9 +3086,10 @@ public boolean getSchemaValidationEnforced(String namespace)
}

@Override
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace, boolean applied) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "schemaValidationEnforced");
path = path.queryParam("applied", applied);
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Boolean>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-subscription-types-enabled myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionTypesEnabled("myprop/clust/ns1");

namespaces.run(split("get-schema-validation-enforce myprop/clust/ns1 -ap"));
verify(mockNamespaces).getSchemaValidationEnforced("myprop/clust/ns1", true);

namespaces
.run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2"));
verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1877,11 +1877,14 @@ private class GetSchemaValidationEnforced extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the namespace")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);

System.out.println(getAdmin().namespaces().getSchemaValidationEnforced(namespace));
System.out.println(getAdmin().namespaces().getSchemaValidationEnforced(namespace, applied));
}
}

Expand Down