diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index cb6541623eb7f..572e4031d3271 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -47,6 +47,19 @@ default CompletableFuture isSuperUser(String role, ServiceConfiguration return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false); } + /** + * Check if specified role is a super user + * @param role the role to check + * @param authenticationData authentication data related to the role + * @return a CompletableFuture containing a boolean in which true means the role is a super user + * and false if it is not + */ + default CompletableFuture isSuperUser(String role, + AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + return isSuperUser(role, serviceConfiguration); + } + /** * Check if specified role is an admin of the tenant * @param tenant the tenant to check @@ -137,7 +150,7 @@ CompletableFuture grantPermissionAsync(NamespaceName namespace, Set grantPermissionAsync(NamespaceName namespace, Set grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set roles, String authDataJson); - + /** * Revoke subscription admin-api access for a role * @param namespace @@ -157,7 +170,7 @@ CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace */ CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, String role, String authDataJson); - + /** * Grant authorization-action permission on a topic to the given client * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 381e8cf780e3e..3bf4458a7c1ba 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -70,9 +70,9 @@ public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService } } - public CompletableFuture isSuperUser(String user) { + public CompletableFuture isSuperUser(String user, AuthenticationDataSource authenticationData) { if (provider != null) { - return provider.isSuperUser(user, conf); + return provider.isSuperUser(user, authenticationData, conf); } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); } @@ -111,7 +111,7 @@ public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set /** * Grant permission to roles that can access subscription-admin api - * + * * @param namespace * @param subscriptionName * @param roles @@ -130,7 +130,7 @@ public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName na /** * Revoke subscription admin-api access for a role - * + * * @param namespace * @param subscriptionName * @param role @@ -143,7 +143,7 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured")); } - + /** * Grant authorization-action permission on a topic to the given client * @@ -180,7 +180,7 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> { + return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { if (isSuperUser) { return CompletableFuture.completedFuture(true); } else { @@ -207,7 +207,7 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> { + return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { if (isSuperUser) { return CompletableFuture.completedFuture(true); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index c569dc9f4a3b7..2273d1d6751d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -52,7 +52,6 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authorization.AuthorizationProvider; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.common.naming.Constants; @@ -187,11 +186,11 @@ protected void validateSuperUserAccess() { try { proxyAuthorizedFuture = pulsar.getBrokerService() .getAuthorizationService() - .isSuperUser(appId); + .isSuperUser(appId, clientAuthData()); originalPrincipalAuthorizedFuture = pulsar.getBrokerService() .getAuthorizationService() - .isSuperUser(originalPrincipal); + .isSuperUser(originalPrincipal, clientAuthData()); if (!proxyAuthorizedFuture.get() || !originalPrincipalAuthorizedFuture.get()) { throw new RestException(Status.UNAUTHORIZED, @@ -206,7 +205,7 @@ protected void validateSuperUserAccess() { } else { if (config().isAuthorizationEnabled() && !pulsar.getBrokerService() .getAuthorizationService() - .isSuperUser(appId) + .isSuperUser(appId, clientAuthData()) .join()) { throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access"); } @@ -266,9 +265,9 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String CompletableFuture isOriginalPrincipalSuperUserFuture; try { AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService(); - isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId); + isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData); - isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal); + isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal, authenticationData); boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get(); boolean originalPrincipalAuthorized @@ -286,7 +285,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String } else { if (!pulsar.getBrokerService() .getAuthorizationService() - .isSuperUser(clientAppId) + .isSuperUser(clientAppId, authenticationData) .join()) { if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) { throw new RestException(Status.UNAUTHORIZED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index ba4cf5bee2815..2a64443e074bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -516,7 +516,7 @@ public void testNonExistentTopic() throws Exception { providerField.setAccessible(true); PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService)); providerField.set(authorizationService, authorizationProvider); - doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); // Test producer creation resetChannel(); @@ -546,7 +546,7 @@ public void testClusterAccess() throws Exception { providerField.set(authorizationService, authorizationProvider); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthorizationEnabled(); - doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class), Mockito.anyString(), any(AuthAction.class)); @@ -574,7 +574,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { providerField.setAccessible(true); PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService)); providerField.set(authorizationService, authorizationProvider); - doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); // Test producer creation resetChannel();