diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d8df4f475d..96bfca1734 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -2666,10 +2666,15 @@ protected CompletableFuture authorize(AclOperation operation, Resource isAuthorizedFuture = authorizer.canLookupAsync(session.getPrincipal(), resource); break; case CREATE: + isAuthorizedFuture = authorizer.canCreateTopicAsync(session.getPrincipal(), resource); + break; case DELETE: + isAuthorizedFuture = authorizer.canDeleteTopicAsync(session.getPrincipal(), resource); + break; case ALTER: + isAuthorizedFuture = authorizer.canAlterTopicAsync(session.getPrincipal(), resource); + break; case DESCRIBE_CONFIGS: - case ALTER_CONFIGS: isAuthorizedFuture = authorizer.canManageTenantAsync(session.getPrincipal(), resource); break; case ANY: @@ -2677,6 +2682,7 @@ protected CompletableFuture authorize(AclOperation operation, Resource isAuthorizedFuture = authorizer.canAccessTenantAsync(session.getPrincipal(), resource); } break; + case ALTER_CONFIGS: case CLUSTER_ACTION: case UNKNOWN: case ALL: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java index 8d803ee354..379f7b1b56 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java @@ -18,6 +18,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; /** @@ -42,4 +43,6 @@ public class KafkaPrincipal implements Principal { * It can be "tenant" or "tenant/namespace" */ private final String tenantSpec; + + private final AuthenticationDataSource authenticationData; } \ No newline at end of file diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java index 428fb8043f..0ac44210c4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.security; +import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP; import io.streamnative.pulsar.handlers.kop.SaslAuth; @@ -26,6 +27,7 @@ import javax.security.sasl.SaslServer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -46,6 +48,7 @@ public class PlainSaslServer implements SaslServer { private boolean complete; private String authorizationId; private String username; + private AuthenticationDataSource authDataSource; private Set proxyRoles; public PlainSaslServer(AuthenticationService authenticationService, PulsarAdmin admin, Set proxyRoles) { @@ -85,6 +88,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException { if (proxyRoles != null && proxyRoles.contains(authState.getAuthRole())) { // the Proxy passes the OriginalPrincipal as "username" authorizationId = saslAuth.getUsername(); + authDataSource = authState.getAuthDataSource(); username = null; // PULSAR TENANT if (authorizationId.contains("/")) { // the proxy uses username/originalPrincipal as "username" @@ -100,7 +104,8 @@ public byte[] evaluateResponse(byte[] response) throws SaslException { } } else { authorizationId = authState.getAuthRole(); - log.info("Authenticated User {}", authorizationId); + authDataSource = authState.getAuthDataSource(); + log.info("Authenticated User {}, AuthDataSource {}", authorizationId, authDataSource); } complete = true; return new byte[0]; @@ -142,6 +147,9 @@ public Object getNegotiatedProperty(String propName) { if (USER_NAME_PROP.equals(propName)) { return username; } + if (AUTH_DATA_SOURCE_PROP.equals(propName)) { + return authDataSource; + } return null; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java index c3ee30fb32..3d4e6cba6f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -67,6 +68,7 @@ public class SaslAuthenticator { public static final String USER_NAME_PROP = "username"; + public static final String AUTH_DATA_SOURCE_PROP = "authDataSource"; private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); @@ -420,7 +422,8 @@ private void handleSaslToken(ChannelHandlerContext ctx, if (response != null) { final Session newSession = new Session( new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), - (String) saslServer.getNegotiatedProperty(USER_NAME_PROP)), + (String) saslServer.getNegotiatedProperty(USER_NAME_PROP), + (AuthenticationDataSource) saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)), "old-clientId"); if (!tenantAccessValidationFunction.apply(newSession)) { throw new AuthenticationException("User is not allowed to access this tenant"); @@ -476,7 +479,8 @@ private void handleSaslToken(ChannelHandlerContext ctx, String pulsarRole = saslServer.getAuthorizationID(); this.session = new Session( new KafkaPrincipal(KafkaPrincipal.USER_TYPE, pulsarRole, - (String) saslServer.getNegotiatedProperty(USER_NAME_PROP)), + (String) saslServer.getNegotiatedProperty(USER_NAME_PROP), + (AuthenticationDataSource) saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)), header.clientId()); registerRequestLatency.accept(apiKey.name, startProcessTime); if (!tenantAccessValidationFunction.apply(session)) { @@ -492,9 +496,11 @@ private void handleSaslToken(ChannelHandlerContext ctx, KafkaResponseUtils.newSaslAuthenticate(responseBuf), null); if (log.isDebugEnabled()) { - log.debug("Authenticate successfully for client, header {}, request {}, session {} username {}", + log.debug("Authenticate successfully for client, header {}, request {}, session {} username {}," + + " authDataSource {}", header, saslAuthenticateRequest, session, - saslServer.getNegotiatedProperty(USER_NAME_PROP)); + saslServer.getNegotiatedProperty(USER_NAME_PROP), + saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)); } } catch (SaslException e) { registerRequestLatency.accept(apiKey.name, startProcessTime); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java index 679f924306..8274d98252 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java @@ -41,6 +41,36 @@ public interface Authorizer { */ CompletableFuture canAccessTenantAsync(KafkaPrincipal principal, Resource resource); + /** + * Check whether the specified role can create topic. + * This permission mapping to pulsar is Tenant Admin or Super Admin. + * + * @param principal login info + * @param resource resources to be authorized + * @return a boolean to determine whether authorized or not + */ + CompletableFuture canCreateTopicAsync(KafkaPrincipal principal, Resource resource); + + /** + * Check whether the specified role can delete topic. + * This permission mapping to pulsar is Tenant Admin or Super Admin. + * + * @param principal login info + * @param resource resources to be authorized + * @return a boolean to determine whether authorized or not + */ + CompletableFuture canDeleteTopicAsync(KafkaPrincipal principal, Resource resource); + + /** + * Check whether the specified role can alter topic. + * This permission mapping to pulsar is Tenant Admin or Super Admin. + * + * @param principal login info + * @param resource resources to be authorized + * @return a boolean to determine whether authorized or not + */ + CompletableFuture canAlterTopicAsync(KafkaPrincipal principal, Resource resource); + /** * Check whether the specified role can manage Pulsar tenant. * This permission mapping to pulsar is Tenant Admin or Super Admin. diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index 69e90e123e..042ef6e85d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -17,17 +17,15 @@ import static com.google.common.base.Preconditions.checkArgument; import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; -import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; /** * Simple acl authorizer. @@ -37,135 +35,17 @@ public class SimpleAclAuthorizer implements Authorizer { private final PulsarService pulsarService; - private final ServiceConfiguration conf; + private final AuthorizationService authorizationService; public SimpleAclAuthorizer(PulsarService pulsarService) { this.pulsarService = pulsarService; - this.conf = pulsarService.getConfiguration(); + this.authorizationService = pulsarService.getBrokerService().getAuthorizationService(); } protected PulsarService getPulsarService() { return this.pulsarService; } - private CompletableFuture authorize(KafkaPrincipal principal, AuthAction action, Resource resource) { - - switch (resource.getResourceType()) { - case TOPIC: - return authorizeTopicPermission(principal, action, resource); - case TENANT: - return authorizeTenantPermission(principal, resource); - default: - return CompletableFuture.completedFuture(false); - } - } - - private CompletableFuture authorizeTopicPermission(KafkaPrincipal principal, AuthAction action, - Resource resource) { - CompletableFuture permissionFuture = new CompletableFuture<>(); - TopicName topicName = TopicName.get(resource.getName()); - NamespaceName namespace = topicName.getNamespaceObject(); - if (namespace == null) { - permissionFuture.completeExceptionally( - new IllegalArgumentException("Resource name must contains namespace.")); - return permissionFuture; - } - String tenantName = namespace.getTenant(); - isSuperUserOrTenantAdmin(tenantName, principal.getName()).whenComplete((isSuperUserOrAdmin, exception) -> { - if (exception != null) { - if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to {} to resource {}: isSuperUserOrAdmin={}", - principal.getName(), action, resource.getName(), isSuperUserOrAdmin); - } - isSuperUserOrAdmin = false; - } - if (isSuperUserOrAdmin) { - permissionFuture.complete(true); - return; - } - getPulsarService() - .getPulsarResources() - .getNamespaceResources() - .getPoliciesAsync(namespace) - .thenAccept(policies -> { - if (!policies.isPresent()) { - if (log.isDebugEnabled()) { - log.debug("Policies node couldn't be found for namespace : {}", principal); - } - permissionFuture.complete(false); - return; - } - authoriseTopicOverNamespacePolicies(principal, action, permissionFuture, topicName, - policies.get()); - }).exceptionally(ex -> { - if (log.isDebugEnabled()) { - log.debug("Client with Principal - {} failed to get permissions for resource - {}. {}", - principal, resource, ex.getMessage()); - } - permissionFuture.completeExceptionally(ex); - return null; - }); - - }); - return permissionFuture; - } - - private void authoriseTopicOverNamespacePolicies(KafkaPrincipal principal, AuthAction action, - CompletableFuture permissionFuture, - TopicName topicName, Policies policies) { - String role = principal.getName(); - - // Check Namespace level policies - Map> namespaceRoles = policies.auth_policies - .getNamespaceAuthentication(); - Set namespaceActions = namespaceRoles.get(role); - if (namespaceActions != null && namespaceActions.contains(action)) { - permissionFuture.complete(true); - return; - } - - // Check Topic level policies - Map> topicRoles = policies - .auth_policies - .getTopicAuthentication() - .get(topicName.toString()); - if (topicRoles != null && role != null) { - // Topic has custom policy - Set topicActions = topicRoles.get(role); - if (topicActions != null && topicActions.contains(action)) { - permissionFuture.complete(true); - return; - } - } - - // Check wildcard policies - if (conf.isAuthorizationAllowWildcardsMatching() - && checkWildcardPermission(role, action, namespaceRoles)) { - // The role has namespace level permission by wildcard match - permissionFuture.complete(true); - return; - } - - // If the partition number of the partitioned topic having topic level policy is updated, - // the new sub partitions may not inherit the policy of the partition topic. - // We can also check the permission of partitioned topic. - // For https://github.com/apache/pulsar/issues/10300 - if (topicName.isPartitioned()) { - topicRoles = policies.auth_policies - .getTopicAuthentication().get(topicName.getPartitionedTopicName()); - if (topicRoles != null) { - // Topic has custom policy - Set topicActions = topicRoles.get(role); - if (topicActions != null && topicActions.contains(action)) { - // The role has topic level permission - permissionFuture.complete(true); - return; - } - } - } - permissionFuture.complete(false); - } - private CompletableFuture authorizeTenantPermission(KafkaPrincipal principal, Resource resource) { CompletableFuture permissionFuture = new CompletableFuture<>(); // we can only check if the tenant exists @@ -187,75 +67,13 @@ private CompletableFuture authorizeTenantPermission(KafkaPrincipal prin return permissionFuture; } - private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAction, - Map> permissionMap) { - for (Map.Entry> permissionData : permissionMap.entrySet()) { - String permittedRole = permissionData.getKey(); - Set permittedActions = permissionData.getValue(); - - // Prefix match - if (checkedRole != null) { - if (permittedRole.charAt(permittedRole.length() - 1) == '*' - && checkedRole.startsWith(permittedRole.substring(0, permittedRole.length() - 1)) - && permittedActions.contains(checkedAction)) { - return true; - } - - // Suffix match - if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1)) - && permittedActions.contains(checkedAction)) { - return true; - } - } - } - return false; - } - - private CompletableFuture isSuperUser(String role) { - Set superUserRoles = conf.getSuperUserRoles(); - return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role)); - } - - /** - * Check if specified role is an admin of the tenant or superuser. - * - * @param tenant the tenant to check - * @param role the role to check - * @return a CompletableFuture containing a boolean in which true means the role is an admin user - * and false if it is not - */ - private CompletableFuture isSuperUserOrTenantAdmin(String tenant, String role) { - CompletableFuture future = new CompletableFuture<>(); - isSuperUser(role).whenComplete((isSuperUser, ex) -> { - if (ex != null || !isSuperUser) { - pulsarService.getPulsarResources() - .getTenantResources() - .getTenantAsync(tenant) - .thenAccept(tenantInfo -> { - if (!tenantInfo.isPresent()) { - future.complete(false); - return; - } - TenantInfo info = tenantInfo.get(); - future.complete(role != null - && info.getAdminRoles() != null - && info.getAdminRoles().contains(role)); - }); - return; - } - future.complete(true); - }); - return future; - } - - @Override public CompletableFuture canAccessTenantAsync(KafkaPrincipal principal, Resource resource) { checkArgument(resource.getResourceType() == ResourceType.TENANT, String.format("Expected resource type is TENANT, but have [%s]", resource.getResourceType())); CompletableFuture canAccessFuture = new CompletableFuture<>(); - authorize(principal, null, resource).whenComplete((hasPermission, ex) -> { + authorizeTenantPermission(principal, resource).whenComplete((hasPermission, ex) -> { if (ex != null) { if (log.isDebugEnabled()) { log.debug( @@ -271,65 +89,76 @@ public CompletableFuture canAccessTenantAsync(KafkaPrincipal principal, return canAccessFuture; } + @Override + public CompletableFuture canCreateTopicAsync(KafkaPrincipal principal, Resource resource) { + checkArgument(resource.getResourceType() == ResourceType.TOPIC, + String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.allowNamespaceOperationAsync( + topicName.getNamespaceObject(), + NamespaceOperation.CREATE_TOPIC, + principal.getName(), + principal.getAuthenticationData()); + } + + @Override + public CompletableFuture canDeleteTopicAsync(KafkaPrincipal principal, Resource resource) { + checkArgument(resource.getResourceType() == ResourceType.TOPIC, + String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.allowNamespaceOperationAsync( + topicName.getNamespaceObject(), + NamespaceOperation.DELETE_TOPIC, + principal.getName(), + principal.getAuthenticationData()); + } + + @Override + public CompletableFuture canAlterTopicAsync(KafkaPrincipal principal, Resource resource) { + checkArgument(resource.getResourceType() == ResourceType.TOPIC, + String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.allowTopicPolicyOperationAsync( + topicName, PolicyName.PARTITION, PolicyOperation.WRITE, + principal.getName(), principal.getAuthenticationData()); + } + @Override public CompletableFuture canManageTenantAsync(KafkaPrincipal principal, Resource resource) { checkArgument(resource.getResourceType() == ResourceType.TOPIC, String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); TopicName topicName = TopicName.get(resource.getName()); - NamespaceName namespace = topicName.getNamespaceObject(); - return isSuperUserOrTenantAdmin(namespace.getTenant(), principal.getName()); + return authorizationService.allowTopicOperationAsync( + topicName, TopicOperation.LOOKUP, principal.getName(), principal.getAuthenticationData()); } @Override public CompletableFuture canLookupAsync(KafkaPrincipal principal, Resource resource) { checkArgument(resource.getResourceType() == ResourceType.TOPIC, String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - - CompletableFuture canLookupFuture = new CompletableFuture<>(); - authorize(principal, AuthAction.consume, resource).whenComplete((hasProducePermission, ex) -> { - if (ex != null) { - if (log.isDebugEnabled()) { - log.debug( - "Resource [{}] Principal [{}] exception occurred while trying to " - + "check Consume permissions. {}", - resource, principal, ex.getMessage()); - } - hasProducePermission = false; - } - if (hasProducePermission) { - canLookupFuture.complete(true); - return; - } - authorize(principal, AuthAction.produce, resource).whenComplete((hasConsumerPermission, e) -> { - if (e != null) { - if (log.isDebugEnabled()) { - log.debug( - "Resource [{}] Principal [{}] exception occurred while trying to " - + "check Produce permissions. {}", - resource, principal, e.getMessage()); - } - canLookupFuture.completeExceptionally(e); - return; - } - canLookupFuture.complete(hasConsumerPermission); - }); - }); - return canLookupFuture; + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.canLookupAsync(topicName, principal.getName(), principal.getAuthenticationData()); } @Override public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) { checkArgument(resource.getResourceType() == ResourceType.TOPIC, String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - return authorize(principal, AuthAction.produce, resource); + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()); } @Override public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Resource resource) { checkArgument(resource.getResourceType() == ResourceType.TOPIC, String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - return authorize(principal, AuthAction.consume, resource); + TopicName topicName = TopicName.get(resource.getName()); + return authorizationService.canConsumeAsync( + topicName, principal.getName(), principal.getAuthenticationData(), ""); } } \ No newline at end of file diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationMockTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationMockTest.java new file mode 100644 index 0000000000..3a3a735f87 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationMockTest.java @@ -0,0 +1,142 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.pulsar.handlers.kop.security.auth.KafkaMockAuthorizationProvider; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.crypto.SecretKey; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Unit test for Authorization with `entryFormat=pulsar`. + */ +public class KafkaAuthorizationMockTest extends KopProtocolHandlerTestBase { + + protected static final String TENANT = "KafkaAuthorizationTest"; + protected static final String NAMESPACE = "ns1"; + private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + protected static final String ADMIN_USER = "pass.pass"; + + @BeforeClass + @Override + protected void setup() throws Exception { + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); + + String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); + + conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); + conf.setKafkaMetadataTenant("internal"); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setKafkaTenant(TENANT); + conf.setKafkaNamespace(NAMESPACE); + + conf.setClusterName(super.configClusterName); + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setAuthorizationProvider(KafkaMockAuthorizationProvider.class.getName()); + conf.setAuthenticationProviders( + Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + adminToken); + conf.setProperties(properties); + + super.internalSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + + @Test(timeOut = 30 * 1000) + public void testSuperUserProduceAndConsume() throws PulsarAdminException { + String superUserToken = AuthTokenUtils.createToken(secretKey, "pass.pass", Optional.empty()); + String topic = "testSuperUserProduceAndConsumeTopic"; + String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; + KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + int totalMsgs = 10; + String messageStrPrefix = topic + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); + } + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + + int i = 0; + while (i < totalMsgs) { + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + Integer key = record.key(); + assertEquals(messageStrPrefix + key.toString(), record.value()); + i++; + } + } + assertEquals(i, totalMsgs); + + // no more records + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); + assertTrue(records.isEmpty()); + + // ensure that we can list the topic + Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); + assertEquals(result.size(), 1); + assertTrue(result.containsKey(topic), + "list of topics " + result.keySet() + " does not contains " + topic); + + // Cleanup + kProducer.close(); + kConsumer.close(); + admin.topics().deletePartitionedTopic(fullNewTopicName); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java index 82d8db5a50..7c3bd3f115 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java @@ -47,7 +47,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -69,6 +68,7 @@ public abstract class KafkaAuthorizationTestBase extends KopProtocolHandlerTestB protected static final String NAMESPACE = "ns1"; private static final String SHORT_TOPIC = "topic1"; private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; + private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); protected static final String SIMPLE_USER = "muggle_user"; protected static final String ANOTHER_USER = "death_eater_user"; @@ -85,15 +85,9 @@ public KafkaAuthorizationTestBase(final String entryFormat) { @BeforeClass @Override protected void setup() throws Exception { - SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - - AuthenticationProviderToken provider = new AuthenticationProviderToken(); Properties properties = new Properties(); properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); - ServiceConfiguration authConf = new ServiceConfiguration(); - authConf.setProperties(properties); - provider.initialize(authConf); userToken = AuthTokenUtils.createToken(secretKey, SIMPLE_USER, Optional.empty()); adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); @@ -614,6 +608,7 @@ public void testDeleteTopicFailed() throws PulsarAdminException, InterruptedExce deleteTopicsResult.all().get(); fail("Should delete failed!"); } catch (ExecutionException ex) { + log.info("Test delete topic failed", ex); assertTrue(ex.getMessage().contains("TopicAuthorizationException")); } try { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaMockAuthorizationProvider.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaMockAuthorizationProvider.java new file mode 100644 index 0000000000..62acef3abe --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaMockAuthorizationProvider.java @@ -0,0 +1,80 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.auth.MockAuthorizationProvider; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; + + +public class KafkaMockAuthorizationProvider extends MockAuthorizationProvider { + @Override + public CompletableFuture allowTopicOperationAsync( + TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) { + return CompletableFuture.completedFuture(true); + } + + @Override + public Boolean allowTopicOperation( + TopicName topicName, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return true; + } + + @Override + public CompletableFuture allowTopicOperationAsync( + TopicName topic, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return CompletableFuture.completedFuture(true); + } + + @Override + public Boolean allowTopicOperation( + TopicName topicName, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return true; + } + + @Override + public CompletableFuture allowTopicPolicyOperationAsync( + TopicName topic, + String role, + PolicyName policy, + PolicyOperation operation, + AuthenticationDataSource authData) { + return CompletableFuture.completedFuture(true); + } + + @Override + public Boolean allowTopicPolicyOperation( + TopicName topicName, + String role, + PolicyName policy, + PolicyOperation operation, + AuthenticationDataSource authData) { + return true; + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java index 43a5c1c942..593e1e0596 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java @@ -29,6 +29,7 @@ import javax.crypto.SecretKey; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -36,8 +37,8 @@ import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** @@ -61,7 +62,7 @@ public class SimpleAclAuthorizerTest extends KopProtocolHandlerTestBase { private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/topic1"; private static final String NOT_EXISTS_TENANT_TOPIC = "persistent://not_exists/" + NAMESPACE + "/topic1"; - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { log.info("success internal setup"); @@ -121,7 +122,7 @@ protected void createAdmin() throws Exception { this.conf.getBrokerClientAuthenticationParameters()).build()); } - @AfterMethod + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -130,32 +131,38 @@ protected void cleanup() throws Exception { @Test public void testAuthorizeProduce() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, + new AuthenticationDataCommand(ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); } @@ -163,27 +170,32 @@ public void testAuthorizeProduce() throws ExecutionException, InterruptedExcepti @Test public void testAuthorizeConsume() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); } @@ -191,27 +203,32 @@ public void testAuthorizeConsume() throws ExecutionException, InterruptedExcepti @Test public void testAuthorizeLookup() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); } @@ -222,25 +239,29 @@ public void testAuthorizeTenantAdmin() throws ExecutionException, InterruptedExc // TENANT_ADMIN_USER can't produce don't exist tenant's topic, // because tenant admin depend on exist tenant. Boolean isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); // ADMIN_USER can produce don't exist tenant's topic, because is superuser. isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, + new AuthenticationDataCommand(ADMIN_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertTrue(isAuthorized); // TENANT_ADMIN_USER can produce. isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); - assertTrue(isAuthorized); + assertFalse(isAuthorized); // TENANT_ADMIN_USER can create or delete Topic isAuthorized = simpleAclAuthorizer.canManageTenantAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); } @@ -252,22 +273,26 @@ public void testTopicLevelPermissions() throws PulsarAdminException, ExecutionEx admin.topics().grantPermission(topic, TOPIC_LEVEL_PERMISSIONS_USER, Sets.newHashSet(AuthAction.produce)); Boolean isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null), + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); }