Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,10 @@ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,

/**
* Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
* Valid combinations fulfill one of the following two rules:
* Valid combinations fulfills the following rule:
* <p>
* 1. The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
* The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
* the originalPrincipal is set to a role that is not also in {@link ServiceConfiguration#getProxyRoles()}.
* <p>
* 2. The authenticatedPrincipal and the originalPrincipal are the same, but are not a proxyRole, when
* allowNonProxyPrincipalsToBeEqual is true.
*
* @return true when roles are a valid combination and false when roles are an invalid combination
*/
public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
Expand All @@ -331,7 +327,9 @@ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
}
} else if (StringUtils.isNotBlank(originalPrincipal)
&& !(allowNonProxyPrincipalsToBeEqual && originalPrincipal.equals(authenticatedPrincipal))) {
errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
log.warn("[{}] Non-proxy role [{}] passed originalPrincipal [{}]. This behavior will not "
+ "be allowed in a future release. A proxy's role must be in the broker's proxyRoles "
+ "configuration.", remoteAddress, authenticatedPrincipal, originalPrincipal);
}
if (errorMsg != null) {
log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
Expand Down Expand Up @@ -376,7 +374,7 @@ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync(
Expand Down Expand Up @@ -434,7 +432,7 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync(
Expand Down Expand Up @@ -478,7 +476,7 @@ public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceNa
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync(
Expand Down Expand Up @@ -537,7 +535,7 @@ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
topicName, policy, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicPolicyOperationAsync(
Expand Down Expand Up @@ -622,7 +620,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicOperationAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authorization;

import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import java.util.HashSet;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class AuthorizationServiceTest {

AuthorizationService authorizationService;

@BeforeClass
void beforeClass() throws PulsarServerException {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setAuthorizationEnabled(true);
// Consider both of these proxy roles to make testing more comprehensive
HashSet<String> proxyRoles = new HashSet<>();
proxyRoles.add("pass.proxy");
proxyRoles.add("fail.proxy");
conf.setProxyRoles(proxyRoles);
conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
authorizationService = new AuthorizationService(conf, null);
}

/**
* See {@link MockAuthorizationProvider} for the implementation of the mock authorization provider.
*/
@DataProvider(name = "roles")
public Object[][] encryptionProvider() {
return new Object[][]{
// Schema: role, originalRole, whether authorization should pass

// Client conditions where original role isn't passed or is blank
{"pass.client", null, Boolean.TRUE},
{"pass.client", " ", Boolean.TRUE},
{"fail.client", null, Boolean.FALSE},
{"fail.client", " ", Boolean.FALSE},

// Proxy conditions where original role isn't passed or is blank
{"pass.proxy", null, Boolean.FALSE},
{"pass.proxy", " ", Boolean.FALSE},
{"fail.proxy", null, Boolean.FALSE},
{"fail.proxy", " ", Boolean.FALSE},

// Normal proxy and client conditions
{"pass.proxy", "pass.client", Boolean.TRUE},
{"pass.proxy", "fail.client", Boolean.FALSE},
{"fail.proxy", "pass.client", Boolean.FALSE},
{"fail.proxy", "fail.client", Boolean.FALSE},

// Not proxy with original principal
{"pass.not-proxy", "pass.client", Boolean.TRUE},
{"pass.not-proxy", "fail.client", Boolean.FALSE},
{"fail.not-proxy", "pass.client", Boolean.FALSE},
{"fail.not-proxy", "fail.client", Boolean.FALSE},

// Covers an unlikely scenario, but valid in the context of this test
{null, "pass.proxy", Boolean.FALSE},
};
}

private void checkResult(boolean expected, boolean actual) {
if (expected) {
assertTrue(actual);
} else {
assertFalse(actual);
}
}

@Test(dataProvider = "roles")
public void testAllowTenantOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
boolean isAuthorized = authorizationService.allowTenantOperationAsync("tenant",
TenantOperation.DELETE_NAMESPACE, originalRole, role, null).get();
checkResult(shouldPass, isAuthorized);
}

@Test(dataProvider = "roles")
public void testNamespaceOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
NamespaceOperation.PACKAGES, originalRole, role, null).get();
checkResult(shouldPass, isAuthorized);
}

@Test(dataProvider = "roles")
public void testTopicOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
TopicOperation.PRODUCE, originalRole, role, null).get();
checkResult(shouldPass, isAuthorized);
}

@Test(dataProvider = "roles")
public void testNamespacePolicyOperationAsync(String role, String originalRole, boolean shouldPass)
throws Exception {
boolean isAuthorized = authorizationService.allowNamespacePolicyOperationAsync(
NamespaceName.get("public/default"), PolicyName.ALL, PolicyOperation.READ, originalRole, role, null)
.get();
checkResult(shouldPass, isAuthorized);
}

@Test(dataProvider = "roles")
public void testTopicPolicyOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
boolean isAuthorized = authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get();
checkResult(shouldPass, isAuthorized);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authorization;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Mock implementation of the authorization provider interface used for testing.
* A role is authorized if it starts with "pass".
*/
public class MockAuthorizationProvider implements AuthorizationProvider {

private CompletableFuture<Boolean> shouldPass(String role) {
return CompletableFuture.completedFuture(role != null && role.startsWith("pass"));
}

@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
return shouldPass(role);
}

@Override
public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
return shouldPass(role);
}


@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}


@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson) {
return null;
}

@Override
public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles, String authDataJson) {
return null;
}

@Override
public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
String role, String authDataJson) {
return null;
}

@Override
public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson) {
return null;
}

@Override
public void close() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public CompletableFuture<Void> validateSuperUserAccessAsync(){
String originalPrincipal = originalPrincipal();
validateOriginalPrincipal(appId, originalPrincipal);

if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
if (pulsar.getConfiguration().getProxyRoles().contains(appId) || StringUtils.isNotBlank(originalPrincipal())) {
BrokerService brokerService = pulsar.getBrokerService();
return brokerService.getAuthorizationService().isSuperUser(appId, clientAuthData())
.thenCompose(proxyAuthorizationSuccess -> {
Expand Down Expand Up @@ -317,7 +317,8 @@ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
validateOriginalPrincipal(clientAppId, originalPrincipal);
if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)
|| StringUtils.isNotBlank(originalPrincipal)) {
AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ public void testOriginalRoleValidation() throws Exception {

// Edge cases that differ because binary protocol and http protocol have different expectations
assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, true));
assertFalse(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));
// This assert flips to assertFalse in the 3.0 release.
assertTrue(auth.isValidOriginalPrincipal("client", "client", (SocketAddress) null, false));

// Only likely in cases when authentication is disabled, but we still define these to be valid.
assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) null, false));
Expand All @@ -264,9 +265,10 @@ public void testOriginalRoleValidation() throws Exception {

// OriginalPrincipal cannot be proxy role
assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", (SocketAddress) null, false));
assertFalse(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
assertFalse(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));
// The next 3 asserts flip to assertFalse in the 3.0 release.
assertTrue(auth.isValidOriginalPrincipal("client", "proxy", (SocketAddress) null, false));
assertTrue(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) null, false));
assertTrue(auth.isValidOriginalPrincipal(null, "proxy", (SocketAddress) null, false));

// Must gracefully handle a missing AuthenticationDataSource
assertTrue(auth.isValidOriginalPrincipal("proxy", "client", (AuthenticationDataSource) null));
Expand Down