diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 701c9a8155253..e1ab772d2fe78 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -308,14 +308,10 @@ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
/**
* Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
- * Valid combinations fulfill one of the following two rules:
+ * Valid combinations fulfills the following rule:
*
- * 1. The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
+ * The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
* the originalPrincipal is set to a role that is not also in {@link ServiceConfiguration#getProxyRoles()}.
- *
- * 2. The authenticatedPrincipal and the originalPrincipal are the same, but are not a proxyRole, when
- * allowNonProxyPrincipalsToBeEqual is true.
- *
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
@@ -331,7 +327,9 @@ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
}
} else if (StringUtils.isNotBlank(originalPrincipal)
&& !(allowNonProxyPrincipalsToBeEqual && originalPrincipal.equals(authenticatedPrincipal))) {
- errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
+ log.warn("[{}] Non-proxy role [{}] passed originalPrincipal [{}]. This behavior will not "
+ + "be allowed in a future release. A proxy's role must be in the broker's proxyRoles "
+ + "configuration.", remoteAddress, authenticatedPrincipal, originalPrincipal);
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
@@ -376,7 +374,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName,
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
CompletableFuture isOriginalAuthorizedFuture = allowTenantOperationAsync(
@@ -434,7 +432,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
CompletableFuture isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
@@ -478,7 +476,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
CompletableFuture isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
@@ -537,7 +535,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
topicName, policy, operation, role, authData);
CompletableFuture isOriginalAuthorizedFuture = allowTopicPolicyOperationAsync(
@@ -622,7 +620,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName,
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
- if (isProxyRole(role)) {
+ if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
CompletableFuture isOriginalAuthorizedFuture = allowTopicOperationAsync(
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
new file mode 100644
index 0000000000000..54747f9d30493
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.HashSet;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AuthorizationServiceTest {
+
+ AuthorizationService authorizationService;
+
+ @BeforeClass
+ void beforeClass() throws PulsarServerException {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setAuthorizationEnabled(true);
+ // Consider both of these proxy roles to make testing more comprehensive
+ HashSet proxyRoles = new HashSet<>();
+ proxyRoles.add("pass.proxy");
+ proxyRoles.add("fail.proxy");
+ conf.setProxyRoles(proxyRoles);
+ conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+ authorizationService = new AuthorizationService(conf, null);
+ }
+
+ /**
+ * See {@link MockAuthorizationProvider} for the implementation of the mock authorization provider.
+ */
+ @DataProvider(name = "roles")
+ public Object[][] encryptionProvider() {
+ return new Object[][]{
+ // Schema: role, originalRole, whether authorization should pass
+
+ // Client conditions where original role isn't passed or is blank
+ {"pass.client", null, Boolean.TRUE},
+ {"pass.client", " ", Boolean.TRUE},
+ {"fail.client", null, Boolean.FALSE},
+ {"fail.client", " ", Boolean.FALSE},
+
+ // Proxy conditions where original role isn't passed or is blank
+ {"pass.proxy", null, Boolean.FALSE},
+ {"pass.proxy", " ", Boolean.FALSE},
+ {"fail.proxy", null, Boolean.FALSE},
+ {"fail.proxy", " ", Boolean.FALSE},
+
+ // Normal proxy and client conditions
+ {"pass.proxy", "pass.client", Boolean.TRUE},
+ {"pass.proxy", "fail.client", Boolean.FALSE},
+ {"fail.proxy", "pass.client", Boolean.FALSE},
+ {"fail.proxy", "fail.client", Boolean.FALSE},
+
+ // Not proxy with original principal
+ {"pass.not-proxy", "pass.client", Boolean.TRUE},
+ {"pass.not-proxy", "fail.client", Boolean.FALSE},
+ {"fail.not-proxy", "pass.client", Boolean.FALSE},
+ {"fail.not-proxy", "fail.client", Boolean.FALSE},
+
+ // Covers an unlikely scenario, but valid in the context of this test
+ {null, "pass.proxy", Boolean.FALSE},
+ };
+ }
+
+ private void checkResult(boolean expected, boolean actual) {
+ if (expected) {
+ assertTrue(actual);
+ } else {
+ assertFalse(actual);
+ }
+ }
+
+ @Test(dataProvider = "roles")
+ public void testAllowTenantOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTenantOperationAsync("tenant",
+ TenantOperation.DELETE_NAMESPACE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespaceOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+ NamespaceOperation.PACKAGES, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+ TopicOperation.PRODUCE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespacePolicyOperationAsync(String role, String originalRole, boolean shouldPass)
+ throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespacePolicyOperationAsync(
+ NamespaceName.get("public/default"), PolicyName.ALL, PolicyOperation.READ, originalRole, role, null)
+ .get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicPolicyOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
+ PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
new file mode 100644
index 0000000000000..beb0b87d22d1a
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Mock implementation of the authorization provider interface used for testing.
+ * A role is authorized if it starts with "pass".
+ */
+public class MockAuthorizationProvider implements AuthorizationProvider {
+
+ private CompletableFuture shouldPass(String role) {
+ return CompletableFuture.completedFuture(role != null && role.startsWith("pass"));
+ }
+
+ @Override
+ public CompletableFuture canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+
+ @Override
+ public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+
+ @Override
+ public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ Set roles, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ String role, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bf676251d91d8..e87f337e5f7c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -197,7 +197,7 @@ public CompletableFuture validateSuperUserAccessAsync(){
String originalPrincipal = originalPrincipal();
validateOriginalPrincipal(appId, originalPrincipal);
- if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
+ if (pulsar.getConfiguration().getProxyRoles().contains(appId) || StringUtils.isNotBlank(originalPrincipal())) {
BrokerService brokerService = pulsar.getBrokerService();
return brokerService.getAuthorizationService().isSuperUser(appId, clientAuthData())
.thenCompose(proxyAuthorizationSuccess -> {
@@ -317,7 +317,8 @@ protected CompletableFuture validateAdminAccessForTenantAsync(
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
validateOriginalPrincipal(clientAppId, originalPrincipal);
- if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)
+ || StringUtils.isNotBlank(originalPrincipal)) {
AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index bd6dfd872c558..4984cc3ce3767 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -250,7 +250,8 @@ public void testOriginalRoleValidation() throws Exception {
// Edge cases that differ because binary protocol and http protocol have different expectations
assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, true));
- assertFalse(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
+ // This assert flips to assertFalse in the 3.0 release.
+ assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
// Only likely in cases when authentication is disabled, but we still define these to be valid.
assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null, false));
@@ -264,9 +265,10 @@ public void testOriginalRoleValidation() throws Exception {
// OriginalPrincipal cannot be proxy role
assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
- assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
+ // The next 3 asserts flip to assertFalse in the 3.0 release.
+ assertTrue(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
+ assertTrue(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
+ assertTrue(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
// Must gracefully handle a missing AuthenticationDataSource
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));