From e1133b89a45c89d4b1f4870c5061c33f3dfcfd38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Thu, 9 Feb 2023 02:08:19 +0100 Subject: [PATCH 1/3] [improve][txn] Allow superusers to abort transactions (#19467) Super users must be always allowed to abort a transaction even if they're not the original owner. * Check that only owner or superusers are allowed to perform txn operations (end, add partition and add subscription) (cherry picked from commit 459a7a57c1b67cfe161cdc40c007a1c2e403b7cd) --- .../apache/pulsar/broker/PulsarService.java | 4 + .../TransactionMetadataStoreService.java | 22 +- .../broker/admin/impl/TransactionsBase.java | 1 + .../pulsar/broker/service/ServerCnx.java | 138 ++++-- .../pulsar/broker/service/ServerCnxTest.java | 423 +++++++++++++++++- .../TransactionMetadataStoreServiceTest.java | 36 +- .../broker/stats/TransactionMetricsTest.java | 8 +- ...icatedTransactionProducerConsumerTest.java | 329 ++++++++++++++ .../transaction/TransactionTestBase.java | 21 +- .../client/impl/TransactionEndToEndTest.java | 2 +- .../policies/data/TransactionMetadata.java | 3 + .../coordinator/TransactionMetadataStore.java | 3 +- .../transaction/coordinator/TxnMeta.java | 7 + .../impl/InMemTransactionMetadataStore.java | 11 +- .../impl/MLTransactionMetadataStore.java | 25 +- .../coordinator/impl/TxnMetaImpl.java | 8 +- .../proto/PulsarTransactionMetadata.proto | 1 + .../MLTransactionMetadataStoreTest.java | 31 +- .../TransactionMetadataStoreProviderTest.java | 12 +- 19 files changed, 992 insertions(+), 93 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5c239e73bf5b1..a36a7f5c70bb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1262,6 +1262,10 @@ public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, Of }); } + public boolean isRunning() { + return this.state == State.Started || this.state == State.Init; + } + public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies) throws PulsarServerException { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 902546958c54e..d65c448d5711b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -289,12 +289,13 @@ public CompletableFuture removeTransactionMetadataStore(TransactionCoordin } } - public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) { + public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills, + String owner) { TransactionMetadataStore store = stores.get(tcId); if (store == null) { return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); } - return store.newTransaction(timeoutInMills); + return store.newTransaction(timeoutInMills, owner); } public CompletableFuture addProducedPartitionToTxn(TxnID txnId, List partitions) { @@ -516,7 +517,22 @@ public Map getStores() { return Collections.unmodifiableMap(stores); } - public synchronized void close () { + public CompletableFuture verifyTxnOwnership(TxnID txnID, String checkOwner) { + return getTxnMeta(txnID) + .thenCompose(meta -> { + // owner was null in the old versions or no auth enabled + if (meta.getOwner() == null) { + return CompletableFuture.completedFuture(true); + } + if (meta.getOwner().equals(checkOwner)) { + return CompletableFuture.completedFuture(true); + } + return CompletableFuture.completedFuture(false); + }); + } + + + public void close () { this.internalPinnedExecutor.shutdown(); stores.forEach((tcId, metadataStore) -> { metadataStore.closeAsync().whenComplete((v, ex) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 8eff6815404cd..6af1919ebbe06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -182,6 +182,7 @@ private void getTransactionMetadata(TxnMeta txnMeta, transactionMetadata.status = txnMeta.status().name(); transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp(); transactionMetadata.timeoutAt = txnMeta.getTimeoutAt(); + transactionMetadata.owner = txnMeta.getOwner(); List> ackedPartitionsFutures = new ArrayList<>(); Map>> ackFutures = new HashMap<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4d32d1d6d352f..1d80c02f3567f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -2226,7 +2227,8 @@ protected void handleNewTxn(CommandNewTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds()) + final String owner = getPrincipal(); + transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner) .whenComplete(((txnID, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { @@ -2261,9 +2263,15 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, - command.getPartitionsList()) - .whenComplete(((v, ex) -> { + verifyTxnOwnership(txnID) + .thenCompose(isOwner -> { + if (!isOwner) { + return failedFutureTxnNotOwned(txnID); + } + return transactionMetadataStoreService + .addProducedPartitionToTxn(txnID, command.getPartitionsList()); + }) + .whenComplete((v, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { log.debug("Send response success for add published partition to txn request {}", requestId); @@ -2278,7 +2286,25 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) { ex.getMessage())); transactionMetadataStoreService.handleOpFail(ex, tcId); } - })); + }); + } + + private CompletableFuture failedFutureTxnNotOwned(TxnID txnID) { + String msg = String.format( + "Client (%s) is neither the owner of the transaction %s nor a super user", + getPrincipal(), txnID + ); + log.warn("[{}] {}", remoteAddress, msg); + return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg)); + } + + private CompletableFuture failedFutureTxnTcNotAllowed(TxnID txnID) { + String msg = String.format( + "TC client (%s) is not a super user, and is not allowed to operate on transaction %s", + getPrincipal(), txnID + ); + log.warn("[{}] {}", remoteAddress, msg); + return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg)); } @Override @@ -2295,8 +2321,13 @@ protected void handleEndTxn(CommandEndTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - transactionMetadataStoreService - .endTransaction(txnID, txnAction, false) + verifyTxnOwnership(txnID) + .thenCompose(isOwner -> { + if (!isOwner) { + return failedFutureTxnNotOwned(txnID); + } + return transactionMetadataStoreService.endTransaction(txnID, txnAction, false); + }) .whenComplete((v, ex) -> { if (ex == null) { ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, @@ -2311,6 +2342,34 @@ protected void handleEndTxn(CommandEndTxn command) { }); } + private CompletableFuture verifyTxnOwnershipForTCToBrokerCommands() { + if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { + return getBrokerService() + .getAuthorizationService() + .isSuperUser(getPrincipal(), getAuthenticationData()); + } else { + return CompletableFuture.completedFuture(true); + } + } + + private CompletableFuture verifyTxnOwnership(TxnID txnID) { + final String checkOwner = getPrincipal(); + return service.pulsar().getTransactionMetadataStoreService() + .verifyTxnOwnership(txnID, checkOwner) + .thenCompose(isOwner -> { + if (isOwner) { + return CompletableFuture.completedFuture(true); + } + if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { + return getBrokerService() + .getAuthorizationService() + .isSuperUser(checkOwner, getAuthenticationData()); + } else { + return CompletableFuture.completedFuture(false); + } + }); + } + @Override protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { final long requestId = command.getRequestId(); @@ -2326,9 +2385,17 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { CompletableFuture> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); topicFuture.thenAccept(optionalTopic -> { if (optionalTopic.isPresent()) { - optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark) + // we only accept super user becase this endpoint is reserved for tc to broker communication + verifyTxnOwnershipForTCToBrokerCommands() + .thenCompose(isOwner -> { + if (!isOwner) { + return failedFutureTxnTcNotAllowed(txnID); + } + return optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark); + }) .whenComplete((ignored, throwable) -> { if (throwable != null) { + throwable = FutureUtil.unwrapCompletionException(throwable); log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], " + "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable); ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( @@ -2340,7 +2407,6 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); }); - } else { getBrokerService().getManagedLedgerFactory() .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) @@ -2409,23 +2475,28 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); return; } - - CompletableFuture completableFuture = - subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark); - completableFuture.whenComplete((ignored, e) -> { - if (e != null) { - log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}" - + "txnId: [{}], txnAction: [{}]", topic, subName, - txnID, TxnAction.valueOf(txnAction), e.getCause()); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - BrokerServiceException.getClientErrorCode(e), - "Handle end txn on subscription failed.")); - return; - } - ctx.writeAndFlush( - Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); - }); + // we only accept super user becase this endpoint is reserved for tc to broker communication + verifyTxnOwnershipForTCToBrokerCommands() + .thenCompose(isOwner -> { + if (!isOwner) { + return failedFutureTxnTcNotAllowed(txnID); + } + return subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark); + }).whenComplete((ignored, e) -> { + if (e != null) { + e = FutureUtil.unwrapCompletionException(e); + log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( + requestId, txnidLeastBits, txnidMostBits, + BrokerServiceException.getClientErrorCode(e), + "Handle end txn on subscription failed: " + e.getMessage())); + return; + } + ctx.writeAndFlush( + Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); + }); } else { getBrokerService().getManagedLedgerFactory() .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) @@ -2490,6 +2561,7 @@ private CompletableFuture tryAddSchema(Topic topic, SchemaData sc protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); final long requestId = command.getRequestId(); + final List subscriptionsList = command.getSubscriptionsList(); if (log.isDebugEnabled()) { log.debug("Receive add published partition to txn request {} from {} with txnId {}", requestId, remoteAddress, txnID); @@ -2504,9 +2576,15 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - transactionMetadataStoreService.addAckedPartitionToTxn(txnID, - MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList())) - .whenComplete(((v, ex) -> { + verifyTxnOwnership(txnID) + .thenCompose(isOwner -> { + if (!isOwner) { + return failedFutureTxnNotOwned(txnID); + } + return transactionMetadataStoreService.addAckedPartitionToTxn(txnID, + MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList)); + }) + .whenComplete((v, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { log.debug("Send response success for add published partition to txn request {}", @@ -2522,7 +2600,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { ex.getMessage())); transactionMetadataStoreService.handleOpFail(ex, tcId); } - })); + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index ee1789b322f11..c3a34dbadbb9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -22,6 +22,9 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doAnswer; @@ -37,6 +40,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -50,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -72,6 +77,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -88,16 +94,23 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; +import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.AuthMethod; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; +import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; +import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; +import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; +import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; +import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; @@ -107,6 +120,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.Policies; @@ -119,6 +133,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.ZooKeeper; +import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; @@ -1962,8 +1977,8 @@ public void testTopicIsNotReady() throws Exception { public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{ // Mock ServerCnx.field: consumers ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class); - Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder); - Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder); + Mockito.when(mapBuilder.expectedItems(anyInt())).thenReturn(mapBuilder); + Mockito.when(mapBuilder.concurrencyLevel(anyInt())).thenReturn(mapBuilder); ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class); Mockito.when(mapBuilder.build()).thenReturn(consumers); ArgumentCaptor ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class); @@ -2019,7 +2034,7 @@ public boolean isCompletedExceptionally(){ return false; } }; - Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); + Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); // do test: delay complete after execute 'isDone()' many times // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051 try (MockedStatic theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) { @@ -2058,12 +2073,12 @@ public boolean isCompletedExceptionally(){ } // case3: exists existingConsumerFuture, already complete and exception CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class); - Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); + Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); // make consumerFuture delay finish Mockito.when(existingConsumerFuture.isDone()).thenReturn(true); // when sync get return, future will return success value. Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException()); - Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())). + Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())). thenThrow(new NullPointerException()); Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true); Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException()); @@ -2117,4 +2132,402 @@ public void testHandleAuthResponseWithoutClientVersion() { verify(authResponse, times(1)).hasClientVersion(); verify(authResponse, times(0)).getClientVersion(); } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleGetTopicsOfNamespace() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleGetTopicsOfNamespace(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleGetSchema() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleGetSchema(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleGetOrCreateSchema() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleGetOrCreateSchema(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleTcClientConnectRequest() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleTcClientConnectRequest(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleNewTxn() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleNewTxn(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleAddPartitionToTxn() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleAddPartitionToTxn(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleEndTxn() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleEndTxn(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleEndTxnOnPartition() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleEndTxnOnPartition(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleEndTxnOnSubscription() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleEndTxnOnSubscription(any()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldFailHandleAddSubscriptionToTxn() throws Exception { + ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); + Field stateUpdater = ServerCnx.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(serverCnx, ServerCnx.State.Failed); + serverCnx.handleAddSubscriptionToTxn(any()); + } + + @Test(timeOut = 30000) + public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception { + resetChannel(); + setChannelConnected(); + doReturn(false).when(pulsar).isRunning(); + assertTrue(channel.isActive()); + + ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1); + channel.writeInbound(clientCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandPartitionedTopicMetadataResponse); + assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady); + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendAddPartitionToTxnResponse() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L, + Lists.newArrayList("tenant/ns/topic1")); + channel.writeInbound(clientCommand); + CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertFalse(response.hasError()); + assertFalse(response.hasMessage()); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendAddPartitionToTxnResponseFailed() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any())) + .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L, + Lists.newArrayList("tenant/ns/topic1")); + channel.writeInbound(clientCommand); + CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError().getValue(), 0); + assertEquals(response.getMessage(), "server error"); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendAddSubscriptionToTxnResponse() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + final org.apache.pulsar.common.api.proto.Subscription sub = + new org.apache.pulsar.common.api.proto.Subscription(); + sub.setTopic("topic1"); + sub.setSubscription("sub1"); + ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L, + Lists.newArrayList(sub)); + channel.writeInbound(clientCommand); + CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertFalse(response.hasError()); + assertFalse(response.hasMessage()); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendAddSubscriptionToTxnResponseFailed() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any())) + .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + final org.apache.pulsar.common.api.proto.Subscription sub = + new org.apache.pulsar.common.api.proto.Subscription(); + sub.setTopic("topic1"); + sub.setSubscription("sub1"); + ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L, + Lists.newArrayList(sub)); + channel.writeInbound(clientCommand); + CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError().getValue(), 0); + assertEquals(response.getMessage(), "server error"); + + channel.finish(); + } + + + @Test(timeOut = 30000) + public void sendEndTxnResponse() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, + TxnAction.COMMIT)); + channel.writeInbound(clientCommand); + CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertFalse(response.hasError()); + assertFalse(response.hasMessage()); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendEndTxnResponseFailed() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, + TxnAction.COMMIT)); + channel.writeInbound(clientCommand); + CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError().getValue(), 0); + assertEquals(response.getMessage(), "server error"); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendEndTxnOnPartitionResponse() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + Topic topic = mock(Topic.class); + doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong()); + doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) + .getTopicIfExists(any(String.class)); + ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L, + successTopicName, TxnAction.COMMIT, 1L); + channel.writeInbound(clientCommand); + CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertFalse(response.hasError()); + assertFalse(response.hasMessage()); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendEndTxnOnPartitionResponseFailed() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + Topic topic = mock(Topic.class); + doReturn(FutureUtil.failedFuture(new RuntimeException("server error"))).when(topic) + .endTxn(any(TxnID.class), anyInt(), anyLong()); + doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) + .getTopicIfExists(any(String.class)); + ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L, + successTopicName, TxnAction.COMMIT, 1L); + channel.writeInbound(clientCommand); + CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError().getValue(), 0); + assertEquals(response.getMessage(), "server error"); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendEndTxnOnSubscription() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + Topic topic = mock(Topic.class); + final org.apache.pulsar.broker.service.Subscription sub = + mock(org.apache.pulsar.broker.service.Subscription.class); + doReturn(sub).when(topic).getSubscription(any()); + doReturn(CompletableFuture.completedFuture(null)) + .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong()); + doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) + .getTopicIfExists(any(String.class)); + + ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L, + successTopicName, successSubName, TxnAction.COMMIT, 1L); + channel.writeInbound(clientCommand); + CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertFalse(response.hasError()); + assertFalse(response.hasMessage()); + + channel.finish(); + } + + + @Test(timeOut = 30000) + public void sendEndTxnOnSubscriptionFailed() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + setChannelConnected(); + Topic topic = mock(Topic.class); + + final org.apache.pulsar.broker.service.Subscription sub = + mock(org.apache.pulsar.broker.service.Subscription.class); + doReturn(sub).when(topic).getSubscription(any()); + doReturn(FutureUtil.failedFuture(new RuntimeException("server error"))) + .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong()); + doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) + .getTopicIfExists(any(String.class)); + + ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L, + successTopicName, successSubName, TxnAction.COMMIT, 1L); + channel.writeInbound(clientCommand); + CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError().getValue(), 0); + assertEquals(response.getMessage(), "Handle end txn on subscription failed: server error"); + + channel.finish(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index e50a39c5dfd91..236e6557ada9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -105,9 +105,9 @@ public void testNewTransaction() throws Exception { .getStores().get(TransactionCoordinatorID.get(1))); checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(2))); - TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5).get(); - TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 5).get(); - TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 5).get(); + TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5, null).get(); + TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 5, null).get(); + TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 5, null).get(); Assert.assertEquals(txnID0.getMostSigBits(), 0); Assert.assertEquals(txnID1.getMostSigBits(), 1); Assert.assertEquals(txnID2.getMostSigBits(), 2); @@ -129,7 +129,7 @@ public void testAddProducedPartitionToTxn() throws Exception { .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get(); + TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get(); List partitions = new ArrayList<>(); partitions.add("ptn-0"); partitions.add("ptn-1"); @@ -152,7 +152,7 @@ public void testAddAckedPartitionToTxn() throws Exception { (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get(); + TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get(); List partitions = new ArrayList<>(); partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build()); partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build()); @@ -181,7 +181,7 @@ public void testTimeoutTracker() throws Exception { int i = -1; while (++i < 1000) { try { - transactionMetadataStore.newTransaction(2000).get(); + newTransactionWithTimeoutOf(2000); } catch (Exception e) { //no operation } @@ -193,6 +193,14 @@ public void testTimeoutTracker() throws Exception { .until(() -> txnMap.size() == 0); } + private TxnID newTransactionWithTimeoutOf(long timeout) + throws InterruptedException, ExecutionException { + MLTransactionMetadataStore transactionMetadataStore = + (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(0)); + return transactionMetadataStore.newTransaction(timeout, null).get(); + } + @Test public void testTimeoutTrackerExpired() throws Exception { pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0)); @@ -207,7 +215,7 @@ public void testTimeoutTrackerExpired() throws Exception { ConcurrentSkipListMap>> txnMap = (ConcurrentSkipListMap>>) field.get(transactionMetadataStore); - transactionMetadataStore.newTransaction(2000).get(); + newTransactionWithTimeoutOf(2000); assertEquals(txnMap.size(), 1); @@ -215,7 +223,7 @@ public void testTimeoutTrackerExpired() throws Exception { Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN)); Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0); - transactionMetadataStore.newTransaction(2000).get(); + newTransactionWithTimeoutOf(2000); assertEquals(txnMap.size(), 1); txnMap.forEach((txnID, txnMetaListPair) -> @@ -242,7 +250,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception { int i = -1; while (++i < 100) { try { - transactionMetadataStore.newTransaction(1000); + newTransactionWithTimeoutOf(1000); } catch (Exception e) { //no operation } @@ -253,7 +261,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception { int i = -1; while (++i < 100) { try { - transactionMetadataStore.newTransaction(2000); + newTransactionWithTimeoutOf(2000); } catch (Exception e) { //no operation } @@ -264,7 +272,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception { int i = -1; while (++i < 100) { try { - transactionMetadataStore.newTransaction(3000); + newTransactionWithTimeoutOf(3000); } catch (Exception e) { //no operation } @@ -275,7 +283,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception { int i = -1; while (++i < 100) { try { - transactionMetadataStore.newTransaction(4000); + newTransactionWithTimeoutOf(4000); } catch (Exception e) { //no operation } @@ -305,7 +313,7 @@ public void transactionTimeoutRecoverTest() throws Exception { .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - transactionMetadataStore.newTransaction(timeout); + newTransactionWithTimeoutOf(2000); pulsar.getTransactionMetadataStoreService() .removeTransactionMetadataStore(TransactionCoordinatorID.get(0)); @@ -346,7 +354,7 @@ public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception { checkTransactionMetadataStoreReady(transactionMetadataStore); - TxnID txnID = transactionMetadataStore.newTransaction(timeOut - 2000).get(); + TxnID txnID = newTransactionWithTimeoutOf(timeOut - 2000); TxnMeta txnMeta = transactionMetadataStore.getTxnMeta(txnID).get(); txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index 0c9f877150a8e..2eafd8a5a7fba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -99,11 +99,11 @@ public void testTransactionCoordinatorMetrics() throws Exception{ Awaitility.await().until(() -> pulsar.getTransactionMetadataStoreService().getStores().size() == 2); pulsar.getTransactionMetadataStoreService().getStores() - .get(transactionCoordinatorIDOne).newTransaction(timeout).get(); + .get(transactionCoordinatorIDOne).newTransaction(timeout, null).get(); pulsar.getTransactionMetadataStoreService().getStores() - .get(transactionCoordinatorIDTwo).newTransaction(timeout).get(); + .get(transactionCoordinatorIDTwo).newTransaction(timeout, null).get(); pulsar.getTransactionMetadataStoreService().getStores() - .get(transactionCoordinatorIDTwo).newTransaction(timeout).get(); + .get(transactionCoordinatorIDTwo).newTransaction(timeout, null).get(); ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); String metricsStr = statsOut.toString(); @@ -191,7 +191,7 @@ public void testTransactionCoordinatorRateMetrics() throws Exception{ metric.forEach(item -> assertEquals(item.value, txnCount / 2)); TxnID txnID = pulsar.getTransactionMetadataStoreService().getStores() - .get(transactionCoordinatorIDOne).newTransaction(1000).get(); + .get(transactionCoordinatorIDOne).newTransaction(1000, null).get(); Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java new file mode 100644 index 0000000000000..000080ff45445 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; + +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.functions.utils.Exceptions; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test for consuming transaction messages. + */ +@Slf4j +@Test(groups = "broker") +public class AuthenticatedTransactionProducerConsumerTest extends TransactionTestBase { + + private static final String TOPIC = NAMESPACE1 + "/txn-auth"; + + private final String ADMIN_TOKEN; + private final String TOKEN_PUBLIC_KEY; + private final KeyPair kp; + + AuthenticatedTransactionProducerConsumerTest() throws NoSuchAlgorithmException { + KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA"); + kp = kpg.generateKeyPair(); + + byte[] encodedPublicKey = kp.getPublic().getEncoded(); + TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey); + ADMIN_TOKEN = generateToken(kp, "admin"); + } + + + private String generateToken(KeyPair kp, String subject) { + PrivateKey pkey = kp.getPrivate(); + long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); + Date exp = new Date(expMillis); + + return Jwts.builder() + .setSubject(subject) + .setExpiration(exp) + .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey)) + .compact(); + } + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + Set superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + + // Set provider domain name + Properties properties = new Properties(); + properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY); + + conf.setProperties(properties); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); + setBrokerCount(1); + internalSetup(); + setUpBase(1, 1, TOPIC, 1); + + grantTxnLookupToRole("client"); + admin.namespaces().grantPermissionOnNamespace(NAMESPACE1, "client", + EnumSet.allOf(AuthAction.class)); + grantTxnLookupToRole("client2"); + } + + @SneakyThrows + private void grantTxnLookupToRole(String role) { + admin.namespaces().grantPermissionOnNamespace( + NamespaceName.SYSTEM_NAMESPACE.toString(), + role, + Sets.newHashSet(AuthAction.consume)); + } + + @Override + protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException { + return clientBuilder + .enableTransaction(true) + .authentication(AuthenticationFactory.token(ADMIN_TOKEN)) + .build(); + } + + @Override + protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) throws PulsarClientException { + return builder + .authentication(AuthenticationFactory.token(ADMIN_TOKEN)) + .build(); + } + + @AfterMethod(alwaysRun = true) + protected void cleanup() { + super.internalCleanup(); + } + + @DataProvider(name = "actors") + public Object[][] actors() { + return new Object[][]{ + {"client", true}, + {"client", false}, + {"client2", true}, + {"client2", false}, + {"admin", true}, + {"admin", false} + }; + } + + @Test(dataProvider = "actors") + public void testEndTxn(String actor, boolean afterUnload) throws Exception { + @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, "client"))) + .enableTransaction(true) + .build(); + + @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, actor))) + .enableTransaction(true) + .build(); + Transaction transaction = pulsarClientOwner.newTransaction() + .withTransactionTimeout(60, TimeUnit.SECONDS).build().get(); + + @Cleanup final Consumer consumer = pulsarClientOwner + .newConsumer(Schema.STRING) + .subscriptionName("test") + .topic(TOPIC) + .subscribe(); + + + @Cleanup final Producer producer = pulsarClientOwner + .newProducer(Schema.STRING) + .sendTimeout(60, TimeUnit.SECONDS) + .topic(TOPIC) + .create(); + + producer.newMessage().value("beforetxn").send(); + consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction); + producer.newMessage(transaction).value("message").send(); + if (afterUnload) { + pulsarServiceList.get(0) + .getTransactionMetadataStoreService() + .removeTransactionMetadataStore( + TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits())); + } + + final Throwable ex = syncGetException(( + (PulsarClientImpl) pulsarClientOther).getTcClient().commitAsync(transaction.getTxnID()) + ); + if (actor.equals("client") || actor.equals("admin")) { + Assert.assertNull(ex); + Assert.assertEquals(consumer.receive(5, TimeUnit.SECONDS).getValue(), "message"); + } else { + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof TransactionCoordinatorClientException, ex.getClass().getName()); + Assert.assertNull(consumer.receive(5, TimeUnit.SECONDS)); + transaction.commit().get(); + Assert.assertEquals(consumer.receive(5, TimeUnit.SECONDS).getValue(), "message"); + } + } + + @Test(dataProvider = "actors") + public void testAddPartitionToTxn(String actor, boolean afterUnload) throws Exception { + @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, "client"))) + .enableTransaction(true) + .build(); + + @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, actor))) + .enableTransaction(true) + .build(); + Transaction transaction = pulsarClientOwner.newTransaction() + .withTransactionTimeout(60, TimeUnit.SECONDS).build().get(); + + if (afterUnload) { + pulsarServiceList.get(0) + .getTransactionMetadataStoreService() + .removeTransactionMetadataStore( + TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits())); + } + + final Throwable ex = syncGetException(((PulsarClientImpl) pulsarClientOther) + .getTcClient().addPublishPartitionToTxnAsync(transaction.getTxnID(), Lists.newArrayList(TOPIC))); + + final TxnMeta txnMeta = pulsarServiceList.get(0).getTransactionMetadataStoreService() + .getTxnMeta(transaction.getTxnID()).get(); + if (actor.equals("client") || actor.equals("admin")) { + Assert.assertNull(ex); + Assert.assertEquals(txnMeta.producedPartitions(), Lists.newArrayList(TOPIC)); + } else { + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof TransactionCoordinatorClientException); + Assert.assertTrue(txnMeta.producedPartitions().isEmpty()); + } + } + + @Test(dataProvider = "actors") + public void testAddSubscriptionToTxn(String actor, boolean afterUnload) throws Exception { + @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, "client"))) + .enableTransaction(true) + .build(); + + @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, actor))) + .enableTransaction(true) + .build(); + Transaction transaction = pulsarClientOwner.newTransaction() + .withTransactionTimeout(60, TimeUnit.SECONDS).build().get(); + + if (afterUnload) { + pulsarServiceList.get(0) + .getTransactionMetadataStoreService() + .removeTransactionMetadataStore( + TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits())); + } + + + final Throwable ex = syncGetException(((PulsarClientImpl) pulsarClientOther) + .getTcClient().addSubscriptionToTxnAsync(transaction.getTxnID(), TOPIC, "sub")); + + final TxnMeta txnMeta = pulsarServiceList.get(0).getTransactionMetadataStoreService() + .getTxnMeta(transaction.getTxnID()).get(); + if (actor.equals("client") || actor.equals("admin")) { + Assert.assertNull(ex); + Assert.assertEquals(txnMeta.ackedPartitions().size(), 1); + } else { + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof TransactionCoordinatorClientException); + Assert.assertTrue(txnMeta.ackedPartitions().isEmpty()); + } + } + + @Test + public void testNoAuth() throws Exception { + try { + @Cleanup final PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true) + .build(); + Assert.fail("should have failed"); + } catch (Exception t) { + Assert.assertTrue(Exceptions.areExceptionsPresentInChain(t, + PulsarClientException.AuthenticationException.class)); + } + } + + private static Throwable syncGetException(CompletableFuture future) { + try { + future.get(); + } catch (InterruptedException e) { + return e; + } catch (ExecutionException e) { + return FutureUtil.unwrapCompletionException(e); + } + return null; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 7510f06c3bc49..25c555f09b949 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -48,7 +48,10 @@ import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -93,7 +96,9 @@ public void internalSetup() throws Exception { if (admin != null) { admin.close(); } - admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build()); + admin = spy( + createNewPulsarAdmin(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress())) + ); if (pulsarClient != null) { pulsarClient.shutdown(); @@ -111,6 +116,15 @@ private void init() throws Exception { mockBookKeeper = createMockBookKeeper(bkExecutor); startBroker(); } + + protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException { + return clientBuilder.build(); + } + + protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) throws PulsarClientException { + return builder.build(); + } + protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int numPartitions) throws Exception{ setBrokerCount(numBroker); internalSetup(); @@ -137,11 +151,10 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int if (pulsarClient != null) { pulsarClient.shutdown(); } - pulsarClient = PulsarClient.builder() + pulsarClient = createNewPulsarClient(PulsarClient.builder() .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); + .enableTransaction(true)); } protected void startBroker() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 6b0f122d7415c..e3bbe1ad97b48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -861,7 +861,7 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception { @Test public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{ TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService() - .newTransaction(new TransactionCoordinatorID(0), 1).get(); + .newTransaction(new TransactionCoordinatorID(0), 1, null).get(); Awaitility.await().until(() -> { try { getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java index 4f987014c7413..824a081876344 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java @@ -41,4 +41,7 @@ public class TransactionMetadata { /** The ackedPartitions of this transaction. */ public Map> ackedPartitions; + + /** The owner of this transaction. */ + public String owner; } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index bbffe80120fac..dd592955404bc 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -56,11 +56,12 @@ default CompletableFuture getTxnStatus(TxnID txnid) { * Create a new transaction in the transaction metadata store. * * @param timeoutInMills the timeout duration of the transaction in mills +* @param owner the role which is the owner of the transaction * @return a future represents the result of creating a new transaction. * it returns {@link TxnID} as the identifier for identifying the * transaction. */ - CompletableFuture newTransaction(long timeoutInMills); + CompletableFuture newTransaction(long timeoutInMills, String owner); /** * Add the produced partitions to transaction identified by txnid. diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java index 4d6d83dfb0ff5..3d68051493063 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java @@ -107,4 +107,11 @@ TxnMeta updateTxnStatus(TxnStatus newStatus, * @return transaction timeout at. */ long getTimeoutAt(); + + /** + * Return the transaction's owner. + * + * @return transaction's owner. + */ + String getOwner(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index ba90c06b27d79..1137869759005 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -24,8 +24,10 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; @@ -73,12 +75,17 @@ public CompletableFuture getTxnMeta(TxnID txnid) { } @Override - public CompletableFuture newTransaction(long timeoutInMills) { + public CompletableFuture newTransaction(long timeoutInMills, String owner) { + if (owner != null) { + if (StringUtils.isBlank(owner)) { + return FutureUtil.failedFuture(new IllegalArgumentException("Owner can't be blank")); + } + } TxnID txnID = new TxnID( tcID.getId(), localID.getAndIncrement() ); - TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills); + TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills, owner); transactions.put(txnID, txn); return CompletableFuture.completedFuture(txnID); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 273a01850cb1b..ea7c38444bc18 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; @@ -142,8 +143,11 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran positions.add(position); long openTimestamp = transactionMetadataEntry.getStartTime(); long timeoutAt = transactionMetadataEntry.getTimeoutMs(); - txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID, - openTimestamp, timeoutAt), positions)); + final String owner = transactionMetadataEntry.hasOwner() + ? transactionMetadataEntry.getOwner() : null; + final TxnMetaImpl left = new TxnMetaImpl(txnID, + openTimestamp, timeoutAt, owner); + txnMetaMap.put(transactionId, MutablePair.of(left, positions)); recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp); } @@ -217,7 +221,7 @@ public CompletableFuture getTxnMeta(TxnID txnID) { } @Override - public CompletableFuture newTransaction(long timeOut) { + public CompletableFuture newTransaction(long timeOut, String owner) { CompletableFuture completableFuture = new CompletableFuture<>(); FutureUtil.safeRunAsync(() -> { if (!checkIfReady()) { @@ -238,13 +242,20 @@ public CompletableFuture newTransaction(long timeOut) { .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) .setLastModificationTime(currentTimeMillis) .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + if (owner != null) { + if (StringUtils.isBlank(owner)) { + completableFuture.completeExceptionally(new IllegalArgumentException("Owner can't be blank")); + return; + } + transactionMetadataEntry.setOwner(owner); + } transactionLog.append(transactionMetadataEntry) .whenComplete((position, throwable) -> { if (throwable != null) { completableFuture.completeExceptionally(throwable); } else { appendLogCount.increment(); - TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); + TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut, owner); List positions = new ArrayList<>(); positions.add(position); Pair> pair = MutablePair.of(txn, positions); @@ -288,9 +299,9 @@ public CompletableFuture addProducedPartitionToTxn(TxnID txnID, List partitions = new ArrayList<>(); @@ -154,7 +152,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); - TxnID txnID = transactionMetadataStore.newTransaction(20000).get(); + TxnID txnID = transactionMetadataStore.newTransaction(20000, null).get(); transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get(); if (isUseManagedLedgerProperties) { transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get(); @@ -183,7 +181,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); - txnID = transactionMetadataStore.newTransaction(100000).get(); + txnID = transactionMetadataStore.newTransaction(100000, null).get(); assertEquals(txnID.getLeastSigBits(), 1); } @@ -214,8 +212,8 @@ public void testInitTransactionReader() throws Exception { break; } if (transactionMetadataStore.checkIfReady()) { - TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get(); - TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get(); + TxnID txnID1 = transactionMetadataStore.newTransaction(1000, "user1").get(); + TxnID txnID2 = transactionMetadataStore.newTransaction(1000, "user2").get(); assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN); assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN); @@ -267,6 +265,9 @@ public void testInitTransactionReader() throws Exception { assertEquals(txnMeta2.ackedPartitions().size(), subscriptions.size()); Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions())); Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions())); + + assertEquals(txnMeta1.getOwner(), "user1"); + assertEquals(txnMeta2.getOwner(), "user2"); assertEquals(txnMeta1.status(), TxnStatus.COMMITTING); assertEquals(txnMeta2.status(), TxnStatus.COMMITTING); transactionMetadataStoreTest @@ -286,7 +287,7 @@ public void testInitTransactionReader() throws Exception { } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException); } - TxnID txnID = transactionMetadataStoreTest.newTransaction(1000).get(); + TxnID txnID = transactionMetadataStoreTest.newTransaction(1000, null).get(); assertEquals(txnID.getLeastSigBits(), 2L); break; } else { @@ -327,8 +328,8 @@ public void testDeleteLog() throws Exception { break; } if (transactionMetadataStore.checkIfReady()) { - TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get(); - TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get(); + TxnID txnID1 = transactionMetadataStore.newTransaction(1000, null).get(); + TxnID txnID2 = transactionMetadataStore.newTransaction(1000, null).get(); assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN); assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN); @@ -391,9 +392,9 @@ public void testRecoverWhenDeleteFromCursor() throws Exception { Awaitility.await().until(transactionMetadataStore::checkIfReady); // txnID1 have not deleted from cursor, we can recover from transaction log - TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get(); + TxnID txnID1 = transactionMetadataStore.newTransaction(1000, null).get(); // txnID2 have deleted from cursor. - TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get(); + TxnID txnID2 = transactionMetadataStore.newTransaction(1000, null).get(); transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN, false).get(); transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get(); @@ -429,7 +430,7 @@ public void testManageLedgerWriteFailState() throws Exception { transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); Awaitility.await().until(transactionMetadataStore::checkIfReady); - transactionMetadataStore.newTransaction(5000).get(); + transactionMetadataStore.newTransaction(5000, null).get(); Field field = MLTransactionLogImpl.class.getDeclaredField("managedLedger"); field.setAccessible(true); ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog); @@ -438,12 +439,12 @@ public void testManageLedgerWriteFailState() throws Exception { AtomicReferenceFieldUpdater state = (AtomicReferenceFieldUpdater) field.get(managedLedger); state.set(managedLedger, WriteFailed); try { - transactionMetadataStore.newTransaction(5000).get(); + transactionMetadataStore.newTransaction(5000, null).get(); fail(); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException); } - transactionMetadataStore.newTransaction(5000).get(); + transactionMetadataStore.newTransaction(5000, null).get(); } diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index a54caaf16c17b..3f046240c117b 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -79,14 +79,14 @@ public void testGetTxnStatusNotFound() throws Exception { @Test public void testGetTxnStatusSuccess() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); } @Test public void testUpdateTxnStatusSuccess() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); @@ -100,7 +100,7 @@ public void testUpdateTxnStatusSuccess() throws Exception { @Test public void testUpdateTxnStatusNotExpectedStatus() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); @@ -119,7 +119,7 @@ public void testUpdateTxnStatusNotExpectedStatus() throws Exception { @Test public void testUpdateTxnStatusCannotTransition() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); @@ -138,7 +138,7 @@ public void testUpdateTxnStatusCannotTransition() throws Exception { @Test public void testAddProducedPartition() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); @@ -192,7 +192,7 @@ public void testAddProducedPartition() throws Exception { @Test public void testAddAckedPartition() throws Exception { - TxnID txnID = this.store.newTransaction(0L).get(); + TxnID txnID = this.store.newTransaction(0L, null).get(); TxnStatus txnStatus = this.store.getTxnStatus(txnID).get(); assertEquals(txnStatus, TxnStatus.OPEN); From 18b4617c4af6191d250bb1bf68bbd709e8028c36 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Thu, 9 Feb 2023 12:41:44 +0800 Subject: [PATCH 2/3] checkstyle --- .../java/org/apache/pulsar/broker/service/ServerCnxTest.java | 1 - .../AuthenticatedTransactionProducerConsumerTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index c3a34dbadbb9f..4e31ef9bcd801 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -54,7 +54,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java index 000080ff45445..3f43013afc977 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From bbbbb5101117b809a579aef5c93ec550c24dbae9 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Thu, 9 Feb 2023 19:17:12 +0800 Subject: [PATCH 3/3] remove not related code --- .../apache/pulsar/broker/PulsarService.java | 4 - .../pulsar/broker/service/ServerCnxTest.java | 422 +----------------- 2 files changed, 5 insertions(+), 421 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a36a7f5c70bb6..5c239e73bf5b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1262,10 +1262,6 @@ public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, Of }); } - public boolean isRunning() { - return this.state == State.Started || this.state == State.Init; - } - public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies) throws PulsarServerException { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4e31ef9bcd801..ee1789b322f11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -22,9 +22,6 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doAnswer; @@ -40,7 +37,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -76,7 +72,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -93,23 +88,16 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; -import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.AuthMethod; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.api.proto.CommandAck.AckType; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; @@ -119,7 +107,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.Policies; @@ -132,7 +119,6 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.ZooKeeper; -import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; @@ -1976,8 +1962,8 @@ public void testTopicIsNotReady() throws Exception { public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{ // Mock ServerCnx.field: consumers ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class); - Mockito.when(mapBuilder.expectedItems(anyInt())).thenReturn(mapBuilder); - Mockito.when(mapBuilder.concurrencyLevel(anyInt())).thenReturn(mapBuilder); + Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder); + Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder); ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class); Mockito.when(mapBuilder.build()).thenReturn(consumers); ArgumentCaptor ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class); @@ -2033,7 +2019,7 @@ public boolean isCompletedExceptionally(){ return false; } }; - Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); + Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); // do test: delay complete after execute 'isDone()' many times // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051 try (MockedStatic theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) { @@ -2072,12 +2058,12 @@ public boolean isCompletedExceptionally(){ } // case3: exists existingConsumerFuture, already complete and exception CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class); - Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); + Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); // make consumerFuture delay finish Mockito.when(existingConsumerFuture.isDone()).thenReturn(true); // when sync get return, future will return success value. Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException()); - Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())). + Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())). thenThrow(new NullPointerException()); Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true); Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException()); @@ -2131,402 +2117,4 @@ public void testHandleAuthResponseWithoutClientVersion() { verify(authResponse, times(1)).hasClientVersion(); verify(authResponse, times(0)).getClientVersion(); } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleGetTopicsOfNamespace() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleGetTopicsOfNamespace(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleGetSchema() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleGetSchema(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleGetOrCreateSchema() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleGetOrCreateSchema(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleTcClientConnectRequest() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleTcClientConnectRequest(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleNewTxn() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleNewTxn(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleAddPartitionToTxn() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleAddPartitionToTxn(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleEndTxn() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleEndTxn(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleEndTxnOnPartition() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleEndTxnOnPartition(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleEndTxnOnSubscription() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleEndTxnOnSubscription(any()); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void shouldFailHandleAddSubscriptionToTxn() throws Exception { - ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS); - Field stateUpdater = ServerCnx.class.getDeclaredField("state"); - stateUpdater.setAccessible(true); - stateUpdater.set(serverCnx, ServerCnx.State.Failed); - serverCnx.handleAddSubscriptionToTxn(any()); - } - - @Test(timeOut = 30000) - public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception { - resetChannel(); - setChannelConnected(); - doReturn(false).when(pulsar).isRunning(); - assertTrue(channel.isActive()); - - ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1); - channel.writeInbound(clientCommand); - Object response = getResponse(); - assertTrue(response instanceof CommandPartitionedTopicMetadataResponse); - assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady); - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendAddPartitionToTxnResponse() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L, - Lists.newArrayList("tenant/ns/topic1")); - channel.writeInbound(clientCommand); - CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertFalse(response.hasError()); - assertFalse(response.hasMessage()); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendAddPartitionToTxnResponseFailed() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any())) - .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L, - Lists.newArrayList("tenant/ns/topic1")); - channel.writeInbound(clientCommand); - CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertEquals(response.getError().getValue(), 0); - assertEquals(response.getMessage(), "server error"); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendAddSubscriptionToTxnResponse() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - final org.apache.pulsar.common.api.proto.Subscription sub = - new org.apache.pulsar.common.api.proto.Subscription(); - sub.setTopic("topic1"); - sub.setSubscription("sub1"); - ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L, - Lists.newArrayList(sub)); - channel.writeInbound(clientCommand); - CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertFalse(response.hasError()); - assertFalse(response.hasMessage()); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendAddSubscriptionToTxnResponseFailed() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any())) - .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - final org.apache.pulsar.common.api.proto.Subscription sub = - new org.apache.pulsar.common.api.proto.Subscription(); - sub.setTopic("topic1"); - sub.setSubscription("sub1"); - ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L, - Lists.newArrayList(sub)); - channel.writeInbound(clientCommand); - CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertEquals(response.getError().getValue(), 0); - assertEquals(response.getMessage(), "server error"); - - channel.finish(); - } - - - @Test(timeOut = 30000) - public void sendEndTxnResponse() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, - TxnAction.COMMIT)); - channel.writeInbound(clientCommand); - CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertFalse(response.hasError()); - assertFalse(response.hasMessage()); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendEndTxnResponseFailed() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error"))); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, - TxnAction.COMMIT)); - channel.writeInbound(clientCommand); - CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertEquals(response.getError().getValue(), 0); - assertEquals(response.getMessage(), "server error"); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendEndTxnOnPartitionResponse() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - Topic topic = mock(Topic.class); - doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong()); - doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) - .getTopicIfExists(any(String.class)); - ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L, - successTopicName, TxnAction.COMMIT, 1L); - channel.writeInbound(clientCommand); - CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertFalse(response.hasError()); - assertFalse(response.hasMessage()); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendEndTxnOnPartitionResponseFailed() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - Topic topic = mock(Topic.class); - doReturn(FutureUtil.failedFuture(new RuntimeException("server error"))).when(topic) - .endTxn(any(TxnID.class), anyInt(), anyLong()); - doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) - .getTopicIfExists(any(String.class)); - ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L, - successTopicName, TxnAction.COMMIT, 1L); - channel.writeInbound(clientCommand); - CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertEquals(response.getError().getValue(), 0); - assertEquals(response.getMessage(), "server error"); - - channel.finish(); - } - - @Test(timeOut = 30000) - public void sendEndTxnOnSubscription() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - Topic topic = mock(Topic.class); - final org.apache.pulsar.broker.service.Subscription sub = - mock(org.apache.pulsar.broker.service.Subscription.class); - doReturn(sub).when(topic).getSubscription(any()); - doReturn(CompletableFuture.completedFuture(null)) - .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong()); - doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) - .getTopicIfExists(any(String.class)); - - ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L, - successTopicName, successSubName, TxnAction.COMMIT, 1L); - channel.writeInbound(clientCommand); - CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertFalse(response.hasError()); - assertFalse(response.hasMessage()); - - channel.finish(); - } - - - @Test(timeOut = 30000) - public void sendEndTxnOnSubscriptionFailed() throws Exception { - final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); - when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); - when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); - when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); - - svcConfig.setTransactionCoordinatorEnabled(true); - resetChannel(); - setChannelConnected(); - Topic topic = mock(Topic.class); - - final org.apache.pulsar.broker.service.Subscription sub = - mock(org.apache.pulsar.broker.service.Subscription.class); - doReturn(sub).when(topic).getSubscription(any()); - doReturn(FutureUtil.failedFuture(new RuntimeException("server error"))) - .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong()); - doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService) - .getTopicIfExists(any(String.class)); - - ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L, - successTopicName, successSubName, TxnAction.COMMIT, 1L); - channel.writeInbound(clientCommand); - CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse(); - - assertEquals(response.getRequestId(), 89L); - assertEquals(response.getTxnidLeastBits(), 1L); - assertEquals(response.getTxnidMostBits(), 12L); - assertEquals(response.getError().getValue(), 0); - assertEquals(response.getMessage(), "Handle end txn on subscription failed: server error"); - - channel.finish(); - } - }