Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2666,17 +2666,23 @@ protected CompletableFuture<Boolean> authorize(AclOperation operation, Resource
isAuthorizedFuture = authorizer.canLookupAsync(session.getPrincipal(), resource);
break;
case CREATE:
isAuthorizedFuture = authorizer.canCreateTopicAsync(session.getPrincipal(), resource);
break;
case DELETE:
isAuthorizedFuture = authorizer.canDeleteTopicAsync(session.getPrincipal(), resource);
break;
case ALTER:
isAuthorizedFuture = authorizer.canAlterTopicAsync(session.getPrincipal(), resource);
break;
case DESCRIBE_CONFIGS:
case ALTER_CONFIGS:
isAuthorizedFuture = authorizer.canManageTenantAsync(session.getPrincipal(), resource);
break;
case ANY:
if (resource.getResourceType() == ResourceType.TENANT) {
isAuthorizedFuture = authorizer.canAccessTenantAsync(session.getPrincipal(), resource);
}
break;
case ALTER_CONFIGS:
case CLUSTER_ACTION:
case UNKNOWN:
case ALL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;


/**
Expand All @@ -42,4 +43,6 @@ public class KafkaPrincipal implements Principal {
* It can be "tenant" or "tenant/namespace"
*/
private final String tenantSpec;

private final AuthenticationDataSource authenticationData;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.security;

import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP;

import io.streamnative.pulsar.handlers.kop.SaslAuth;
Expand All @@ -26,6 +27,7 @@
import javax.security.sasl.SaslServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
Expand All @@ -46,6 +48,7 @@ public class PlainSaslServer implements SaslServer {
private boolean complete;
private String authorizationId;
private String username;
private AuthenticationDataSource authDataSource;
private Set<String> proxyRoles;

public PlainSaslServer(AuthenticationService authenticationService, PulsarAdmin admin, Set<String> proxyRoles) {
Expand Down Expand Up @@ -85,6 +88,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
if (proxyRoles != null && proxyRoles.contains(authState.getAuthRole())) {
// the Proxy passes the OriginalPrincipal as "username"
authorizationId = saslAuth.getUsername();
authDataSource = authState.getAuthDataSource();
username = null; // PULSAR TENANT
if (authorizationId.contains("/")) {
// the proxy uses username/originalPrincipal as "username"
Expand All @@ -100,7 +104,8 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
}
} else {
authorizationId = authState.getAuthRole();
log.info("Authenticated User {}", authorizationId);
authDataSource = authState.getAuthDataSource();
log.info("Authenticated User {}, AuthDataSource {}", authorizationId, authDataSource);
}
complete = true;
return new byte[0];
Expand Down Expand Up @@ -142,6 +147,9 @@ public Object getNegotiatedProperty(String propName) {
if (USER_NAME_PROP.equals(propName)) {
return username;
}
if (AUTH_DATA_SOURCE_PROP.equals(propName)) {
return authDataSource;
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;

Expand All @@ -67,6 +68,7 @@
public class SaslAuthenticator {

public static final String USER_NAME_PROP = "username";
public static final String AUTH_DATA_SOURCE_PROP = "authDataSource";

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

Expand Down Expand Up @@ -420,7 +422,8 @@ private void handleSaslToken(ChannelHandlerContext ctx,
if (response != null) {
final Session newSession = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(),
(String) saslServer.getNegotiatedProperty(USER_NAME_PROP)),
(String) saslServer.getNegotiatedProperty(USER_NAME_PROP),
(AuthenticationDataSource) saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)),
"old-clientId");
if (!tenantAccessValidationFunction.apply(newSession)) {
throw new AuthenticationException("User is not allowed to access this tenant");
Expand Down Expand Up @@ -476,7 +479,8 @@ private void handleSaslToken(ChannelHandlerContext ctx,
String pulsarRole = saslServer.getAuthorizationID();
this.session = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, pulsarRole,
(String) saslServer.getNegotiatedProperty(USER_NAME_PROP)),
(String) saslServer.getNegotiatedProperty(USER_NAME_PROP),
(AuthenticationDataSource) saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP)),
header.clientId());
registerRequestLatency.accept(apiKey.name, startProcessTime);
if (!tenantAccessValidationFunction.apply(session)) {
Expand All @@ -492,9 +496,11 @@ private void handleSaslToken(ChannelHandlerContext ctx,
KafkaResponseUtils.newSaslAuthenticate(responseBuf),
null);
if (log.isDebugEnabled()) {
log.debug("Authenticate successfully for client, header {}, request {}, session {} username {}",
log.debug("Authenticate successfully for client, header {}, request {}, session {} username {},"
+ " authDataSource {}",
header, saslAuthenticateRequest, session,
saslServer.getNegotiatedProperty(USER_NAME_PROP));
saslServer.getNegotiatedProperty(USER_NAME_PROP),
saslServer.getNegotiatedProperty(AUTH_DATA_SOURCE_PROP));
}
} catch (SaslException e) {
registerRequestLatency.accept(apiKey.name, startProcessTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,36 @@ public interface Authorizer {
*/
CompletableFuture<Boolean> canAccessTenantAsync(KafkaPrincipal principal, Resource resource);

/**
* Check whether the specified role can create topic.
* This permission mapping to pulsar is Tenant Admin or Super Admin.
*
* @param principal login info
* @param resource resources to be authorized
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> canCreateTopicAsync(KafkaPrincipal principal, Resource resource);

/**
* Check whether the specified role can delete topic.
* This permission mapping to pulsar is Tenant Admin or Super Admin.
*
* @param principal login info
* @param resource resources to be authorized
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> canDeleteTopicAsync(KafkaPrincipal principal, Resource resource);

/**
* Check whether the specified role can alter topic.
* This permission mapping to pulsar is Tenant Admin or Super Admin.
*
* @param principal login info
* @param resource resources to be authorized
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> canAlterTopicAsync(KafkaPrincipal principal, Resource resource);

/**
* Check whether the specified role can manage Pulsar tenant.
* This permission mapping to pulsar is Tenant Admin or Super Admin.
Expand Down
Loading