diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4fc79a124acd8..973dec8980ea0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; @@ -2534,32 +2535,39 @@ protected void handleEndTxn(CommandEndTxn command) { }); } - private CompletableFuture verifyTxnOwnershipForTCToBrokerCommands() { + private CompletableFuture isSuperUser() { + assert ctx.executor().inEventLoop(); if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { - return getBrokerService() - .getAuthorizationService() - .isSuperUser(getPrincipal(), getAuthenticationData()); + CompletableFuture isAuthRoleAuthorized = service.getAuthorizationService().isSuperUser( + authRole, authenticationData); + if (originalPrincipal != null) { + CompletableFuture isOriginalPrincipalAuthorized = service.getAuthorizationService() + .isSuperUser(originalPrincipal, + originalAuthData != null ? originalAuthData : authenticationData); + return isOriginalPrincipalAuthorized.thenCombine(isAuthRoleAuthorized, + (originalPrincipal, authRole) -> originalPrincipal && authRole); + } else { + return isAuthRoleAuthorized; + } } else { return CompletableFuture.completedFuture(true); } } private CompletableFuture verifyTxnOwnership(TxnID txnID) { - final String checkOwner = getPrincipal(); + assert ctx.executor().inEventLoop(); return service.pulsar().getTransactionMetadataStoreService() - .verifyTxnOwnership(txnID, checkOwner) - .thenCompose(isOwner -> { + .verifyTxnOwnership(txnID, getPrincipal()) + .thenComposeAsync(isOwner -> { if (isOwner) { return CompletableFuture.completedFuture(true); } if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { - return getBrokerService() - .getAuthorizationService() - .isSuperUser(checkOwner, getAuthenticationData()); + return isSuperUser(); } else { return CompletableFuture.completedFuture(false); } - }); + }, ctx.executor()); } @Override @@ -2576,10 +2584,10 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { txnID, txnAction); } CompletableFuture> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); - topicFuture.thenAccept(optionalTopic -> { + topicFuture.thenAcceptAsync(optionalTopic -> { if (optionalTopic.isPresent()) { - // we only accept super user becase this endpoint is reserved for tc to broker communication - verifyTxnOwnershipForTCToBrokerCommands() + // we only accept superuser because this endpoint is reserved for tc to broker communication + isSuperUser() .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnTcNotAllowed(txnID); @@ -2629,7 +2637,7 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { return null; }); } - }).exceptionally(e -> { + }, ctx.executor()).exceptionally(e -> { log.error("handleEndTxnOnPartition fail ! topic {}, " + "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), e.getCause()); @@ -2658,7 +2666,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { } CompletableFuture> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); - topicFuture.thenAccept(optionalTopic -> { + topicFuture.thenAcceptAsync(optionalTopic -> { if (optionalTopic.isPresent()) { Subscription subscription = optionalTopic.get().getSubscription(subName); if (subscription == null) { @@ -2670,7 +2678,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { return; } // we only accept super user becase this endpoint is reserved for tc to broker communication - verifyTxnOwnershipForTCToBrokerCommands() + isSuperUser() .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnTcNotAllowed(txnID); @@ -2720,7 +2728,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { return null; }); } - }).exceptionally(e -> { + }, ctx.executor()).exceptionally(e -> { log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}" + "txnId: [{}], txnAction: [{}]", topic, subName, txnID, TxnAction.valueOf(txnAction), e.getCause()); @@ -2757,7 +2765,10 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { checkArgument(state == State.Connected); final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); final long requestId = command.getRequestId(); - final List subscriptionsList = command.getSubscriptionsList(); + final List subscriptionsList = new ArrayList<>(); + for (org.apache.pulsar.common.api.proto.Subscription sub : command.getSubscriptionsList()) { + subscriptionsList.add(new org.apache.pulsar.common.api.proto.Subscription().copyFrom(sub)); + } if (log.isDebugEnabled()) { log.debug("Receive add published partition to txn request {} from {} with txnId {}", requestId, remoteAddress, txnID); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4580f028de2b0..874d43896223c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.matches; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -75,6 +77,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider; +import org.apache.pulsar.broker.auth.MockAuthorizationProvider; import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.testcontext.PulsarTestContext; @@ -3171,6 +3174,46 @@ public void sendAddPartitionToTxnResponseFailed() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void sendAddPartitionToTxnResponseFailedAuth() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setProxyRoles(Set.of("pass.fail")); + + svcConfig.setAuthorizationProvider(MockAuthorizationProvider.class.getName()); + AuthorizationService authorizationService = + spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, svcConfig, + pulsarTestContext.getPulsarResources()); + when(brokerService.getAuthorizationService()).thenReturn(authorizationService); + svcConfig.setAuthorizationEnabled(true); + + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(false)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + + ByteBuf connect = Commands.newConnect(authMethodName, "pass.fail", "test", "localhost", + "pass.pass", "pass.pass", authMethodName); + channel.writeInbound(connect); + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + + ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L, + List.of("tenant/ns/topic1")); + channel.writeInbound(clientCommand); + CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse(); + + assertEquals(response.getError(), ServerError.TransactionNotFound); + verify(txnStore, never()).addProducedPartitionToTxn(any(TxnID.class), any()); + + channel.finish(); + } + @Test(timeOut = 30000) public void sendAddSubscriptionToTxnResponse() throws Exception { final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);