Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
}

/**
* Check if specified role is a super user
* @param role the role to check
* @param authenticationData authentication data related to the role
* @return a CompletableFuture containing a boolean in which true means the role is a super user
* and false if it is not
*/
default CompletableFuture<Boolean> isSuperUser(String role,
AuthenticationDataSource authenticationData,
ServiceConfiguration serviceConfiguration) {
return isSuperUser(role, serviceConfiguration);
}

/**
* Check if specified role is an admin of the tenant
* @param tenant the tenant to check
Expand Down Expand Up @@ -137,7 +150,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc

/**
* Grant permission to roles that can access subscription-admin api
*
*
* @param namespace
* @param subscriptionName
* @param roles
Expand All @@ -147,7 +160,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc
*/
CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
String authDataJson);

/**
* Revoke subscription admin-api access for a role
* @param namespace
Expand All @@ -157,7 +170,7 @@ CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace
*/
CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
String role, String authDataJson);

/**
* Grant authorization-action permission on a topic to the given client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService
}
}

public CompletableFuture<Boolean> isSuperUser(String user) {
public CompletableFuture<Boolean> isSuperUser(String user, AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isSuperUser(user, conf);
return provider.isSuperUser(user, authenticationData, conf);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}
Expand Down Expand Up @@ -111,7 +111,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set

/**
* Grant permission to roles that can access subscription-admin api
*
*
* @param namespace
* @param subscriptionName
* @param roles
Expand All @@ -130,7 +130,7 @@ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName na

/**
* Revoke subscription admin-api access for a role
*
*
* @param namespace
* @param subscriptionName
* @param role
Expand All @@ -143,7 +143,7 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

/**
* Grant authorization-action permission on a topic to the given client
*
Expand Down Expand Up @@ -180,7 +180,7 @@ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String ro
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> {
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
Expand All @@ -207,7 +207,7 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> {
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -187,11 +186,11 @@ protected void validateSuperUserAccess() {
try {
proxyAuthorizedFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(appId);
.isSuperUser(appId, clientAuthData());

originalPrincipalAuthorizedFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(originalPrincipal);
.isSuperUser(originalPrincipal, clientAuthData());

if (!proxyAuthorizedFuture.get() || !originalPrincipalAuthorizedFuture.get()) {
throw new RestException(Status.UNAUTHORIZED,
Expand All @@ -206,7 +205,7 @@ protected void validateSuperUserAccess() {
} else {
if (config().isAuthorizationEnabled() && !pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(appId)
.isSuperUser(appId, clientAuthData())
.join()) {
throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access");
}
Expand Down Expand Up @@ -266,9 +265,9 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
try {
AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId);
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData);

isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal);
isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal, authenticationData);

boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get();
boolean originalPrincipalAuthorized
Expand All @@ -286,7 +285,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
} else {
if (!pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(clientAppId)
.isSuperUser(clientAppId, authenticationData)
.join()) {
if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
throw new RestException(Status.UNAUTHORIZED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public void testNonExistentTopic() throws Exception {
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

// Test producer creation
resetChannel();
Expand Down Expand Up @@ -546,7 +546,7 @@ public void testClusterAccess() throws Exception {
providerField.set(authorizationService, authorizationProvider);
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class), Mockito.anyString(),
any(AuthAction.class));

Expand Down Expand Up @@ -574,7 +574,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

// Test producer creation
resetChannel();
Expand Down