diff --git a/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml index 2cca8b90e22f..be2e2df5f1fe 100644 --- a/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml @@ -65,6 +65,10 @@ + + + + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java index ba3b81751923..6f2d78c7e71e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java @@ -74,6 +74,10 @@ public final class BlockDeletingServiceMetrics { @Metric(about = "The total number of Container chosen to be deleted.") private MutableGaugeLong totalContainerChosenCount; + @Metric(about = "The total number of transactions which failed due" + + " to container lock wait timeout.") + private MutableGaugeLong totalLockTimeoutTransactionCount; + private BlockDeletingServiceMetrics() { } @@ -140,6 +144,10 @@ public void setTotalPendingBlockCount(long count) { this.totalPendingBlockCount.set(count); } + public void incrTotalLockTimeoutTransactionCount() { + totalLockTimeoutTransactionCount.incr(); + } + public long getSuccessCount() { return successCount.value(); } @@ -172,6 +180,10 @@ public long getTotalContainerChosenCount() { return totalContainerChosenCount.value(); } + public long getTotalLockTimeoutTransactionCount() { + return totalLockTimeoutTransactionCount.value(); + } + @Override public String toString() { StringBuffer buffer = new StringBuffer(); @@ -195,7 +207,9 @@ public String toString() { .append("receivedBlockCount = " + receivedBlockCount.value()).append("\t") .append("markedBlockCount = " - + markedBlockCount.value()).append("\t"); + + markedBlockCount.value()).append("\t") + .append("totalLockTimeoutTransactionCount = " + + totalLockTimeoutTransactionCount.value()).append("\t"); return buffer.toString(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index a52f9413583b..bf004586926f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -188,6 +188,23 @@ public class DatanodeConfiguration { private long recoveringContainerScrubInterval = Duration.ofMinutes(10).toMillis(); + /** + * The maximum time to wait for acquiring the container lock when processing + * a delete block transaction. + * If a timeout occurs while attempting to get the lock, the delete block + * transaction won't be immediately discarded. Instead, it will be retried + * after all the current delete block transactions have been processed. + */ + @Config(key = "block.delete.max.lock.wait.timeout", + defaultValue = "100ms", + type = ConfigType.TIME, + tags = { DATANODE, ConfigTag.DELETION}, + description = "Timeout for the thread used to process the delete" + + " block command to wait for the container lock." + ) + private long blockDeleteMaxLockWaitTimeoutMs = + Duration.ofMillis(100).toMillis(); + public Duration getBlockDeletionInterval() { return Duration.ofMillis(blockDeletionInterval); } @@ -573,6 +590,10 @@ public int getBlockDeleteQueueLimit() { return blockDeleteQueueLimit; } + public long getBlockDeleteMaxLockWaitTimeoutMs() { + return blockDeleteMaxLockWaitTimeoutMs; + } + public void setBlockDeleteQueueLimit(int queueLimit) { this.blockDeleteQueueLimit = queueLimit; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c988b21867cf..c9805a1c5269 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -225,8 +225,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(getContainer(), - conf, dnConf.getBlockDeleteThreads(), - dnConf.getBlockDeleteQueueLimit())) + conf, dnConf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor, pullReplicatorWithMetrics, pushReplicatorWithMetrics)) .addHandler(reconstructECContainersCommandHandler) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 7df4a1413cb8..87d35c2da00b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; @@ -38,6 +39,8 @@ .DeletedContainerBlocksSummary; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -58,7 +61,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -92,17 +97,26 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final Daemon handlerThread; private final OzoneContainer ozoneContainer; private final BlockDeletingServiceMetrics blockDeleteMetrics; + private final long tryLockTimeoutMs; + private final Map schemaHandlers; public DeleteBlocksCommandHandler(OzoneContainer container, - ConfigurationSource conf, int threadPoolSize, int queueLimit) { + ConfigurationSource conf, DatanodeConfiguration dnConf) { this.ozoneContainer = container; this.containerSet = container.getContainerSet(); this.conf = conf; this.blockDeleteMetrics = BlockDeletingServiceMetrics.create(); + this.tryLockTimeoutMs = dnConf.getBlockDeleteMaxLockWaitTimeoutMs(); + schemaHandlers = new HashMap<>(); + schemaHandlers.put(SCHEMA_V1, this::markBlocksForDeletionSchemaV1); + schemaHandlers.put(SCHEMA_V2, this::markBlocksForDeletionSchemaV2); + schemaHandlers.put(SCHEMA_V3, this::markBlocksForDeletionSchemaV3); + this.executor = Executors.newFixedThreadPool( - threadPoolSize, new ThreadFactoryBuilder() + dnConf.getBlockDeleteThreads(), new ThreadFactoryBuilder() .setNameFormat("DeleteBlocksCommandHandlerThread-%d").build()); - this.deleteCommandQueues = new LinkedBlockingQueue<>(queueLimit); + this.deleteCommandQueues = + new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit()); handlerThread = new Daemon(new DeleteCmdWorker()); handlerThread.start(); } @@ -177,6 +191,27 @@ public SCMConnectionManager getConnectionManager() { } } + /** + * This class represents the result of executing a delete block transaction. + */ + public static final class DeleteBlockTransactionExecutionResult { + private final DeleteBlockTransactionResult result; + private final boolean lockAcquisitionFailed; + public DeleteBlockTransactionExecutionResult( + DeleteBlockTransactionResult result, boolean lockAcquisitionFailed) { + this.result = result; + this.lockAcquisitionFailed = lockAcquisitionFailed; + } + + public DeleteBlockTransactionResult getResult() { + return result; + } + + public boolean isLockAcquisitionFailed() { + return lockAcquisitionFailed; + } + } + /** * Process delete commands. */ @@ -208,7 +243,7 @@ public void run() { * Process one delete transaction. */ public final class ProcessTransactionTask implements - Callable { + Callable { private DeletedBlocksTransaction tx; public ProcessTransactionTask(DeletedBlocksTransaction transaction) { @@ -216,12 +251,12 @@ public ProcessTransactionTask(DeletedBlocksTransaction transaction) { } @Override - public DeleteBlockTransactionResult call() { + public DeleteBlockTransactionExecutionResult call() { DeleteBlockTransactionResult.Builder txResultBuilder = DeleteBlockTransactionResult.newBuilder(); txResultBuilder.setTxID(tx.getTxID()); long containerId = tx.getContainerID(); - int newDeletionBlocks = 0; + boolean lockAcquisitionFailed = false; try { Container cont = containerSet.getContainer(containerId); if (cont == null) { @@ -232,27 +267,30 @@ public DeleteBlockTransactionResult call() { ContainerType containerType = cont.getContainerType(); switch (containerType) { case KeyValueContainer: - KeyValueContainerData containerData = (KeyValueContainerData) - cont.getContainerData(); - cont.writeLock(); - try { - if (containerData.hasSchema(SCHEMA_V1)) { - markBlocksForDeletionSchemaV1(containerData, tx); - } else if (containerData.hasSchema(SCHEMA_V2)) { - markBlocksForDeletionSchemaV2(containerData, tx, - newDeletionBlocks, tx.getTxID()); - } else if (containerData.hasSchema(SCHEMA_V3)) { - markBlocksForDeletionSchemaV3(containerData, tx, - newDeletionBlocks, tx.getTxID()); - } else { - throw new UnsupportedOperationException( - "Only schema version 1,2,3 are supported."); + KeyValueContainer keyValueContainer = (KeyValueContainer)cont; + KeyValueContainerData containerData = + keyValueContainer.getContainerData(); + if (keyValueContainer. + writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) { + try { + String schemaVersion = containerData + .getSupportedSchemaVersionOrDefault(); + if (getSchemaHandlers().containsKey(schemaVersion)) { + schemaHandlers.get(schemaVersion).handle(containerData, tx); + } else { + throw new UnsupportedOperationException( + "Only schema version 1,2,3 are supported."); + } + } finally { + keyValueContainer.writeUnlock(); } - } finally { - cont.writeUnlock(); + txResultBuilder.setContainerID(containerId) + .setSuccess(true); + } else { + lockAcquisitionFailed = true; + txResultBuilder.setContainerID(containerId) + .setSuccess(false); } - txResultBuilder.setContainerID(containerId) - .setSuccess(true); break; default: LOG.error( @@ -262,10 +300,15 @@ public DeleteBlockTransactionResult call() { } catch (IOException e) { LOG.warn("Failed to delete blocks for container={}, TXID={}", tx.getContainerID(), tx.getTxID(), e); - txResultBuilder.setContainerID(containerId) - .setSuccess(false); + txResultBuilder.setContainerID(containerId).setSuccess(false); + } catch (InterruptedException e) { + LOG.warn("InterruptedException while deleting blocks for " + + "container={}, TXID={}", tx.getContainerID(), tx.getTxID(), e); + Thread.currentThread().interrupt(); + txResultBuilder.setContainerID(containerId).setSuccess(false); } - return txResultBuilder.build(); + return new DeleteBlockTransactionExecutionResult( + txResultBuilder.build(), lockAcquisitionFailed); } } @@ -295,26 +338,11 @@ private void processCmd(DeleteCmdInfo cmd) { summary.getNumOfRetryTxs()); blockDeleteMetrics.incrReceivedBlockCount(summary.getNumOfBlocks()); - ContainerBlocksDeletionACKProto.Builder resultBuilder = - ContainerBlocksDeletionACKProto.newBuilder(); - List> futures = new ArrayList<>(); - for (int i = 0; i < containerBlocks.size(); i++) { - DeletedBlocksTransaction tx = containerBlocks.get(i); - Future future = - executor.submit(new ProcessTransactionTask(tx)); - futures.add(future); - } - - // Wait for tasks to finish - futures.forEach(f -> { - try { - resultBuilder.addResults(f.get()); - } catch (InterruptedException | ExecutionException e) { - LOG.error("task failed.", e); - Thread.currentThread().interrupt(); - } - }); + List results = + executeCmdWithRetry(containerBlocks); + ContainerBlocksDeletionACKProto.Builder resultBuilder = + ContainerBlocksDeletionACKProto.newBuilder().addAllResults(results); resultBuilder.setDnId(cmd.getContext().getParent().getDatanodeDetails() .getUuid().toString()); blockDeletionACK = resultBuilder.build(); @@ -346,9 +374,73 @@ private void processCmd(DeleteCmdInfo cmd) { } } + @VisibleForTesting + public List executeCmdWithRetry( + List transactions) { + List results = + new ArrayList<>(transactions.size()); + Map idToTransaction = + new HashMap<>(transactions.size()); + transactions.forEach(tx -> idToTransaction.put(tx.getTxID(), tx)); + List retryTransaction = new ArrayList<>(); + + List> futures = + submitTasks(transactions); + // Wait for tasks to finish + handleTasksResults(futures, result -> { + if (result.isLockAcquisitionFailed()) { + retryTransaction.add(idToTransaction.get(result.getResult().getTxID())); + } else { + results.add(result.getResult()); + } + }); + + idToTransaction.clear(); + // Wait for all tasks to complete before retrying, usually it takes + // some time for all tasks to complete, then the retry may be successful. + // We will only retry once + if (!retryTransaction.isEmpty()) { + futures = submitTasks(retryTransaction); + handleTasksResults(futures, result -> { + if (result.isLockAcquisitionFailed()) { + blockDeleteMetrics.incrTotalLockTimeoutTransactionCount(); + } + results.add(result.getResult()); + }); + } + return results; + } + + @VisibleForTesting + public List> submitTasks( + List deletedBlocksTransactions) { + List> futures = + new ArrayList<>(deletedBlocksTransactions.size()); + + for (DeletedBlocksTransaction tx : deletedBlocksTransactions) { + Future future = + executor.submit(new ProcessTransactionTask(tx)); + futures.add(future); + } + return futures; + } + + public void handleTasksResults( + List> futures, + Consumer handler) { + futures.forEach(f -> { + try { + DeleteBlockTransactionExecutionResult result = f.get(); + handler.accept(result); + } catch (InterruptedException | ExecutionException e) { + LOG.error("task failed.", e); + Thread.currentThread().interrupt(); + } + }); + } + private void markBlocksForDeletionSchemaV3( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID) + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { DeletionMarker schemaV3Marker = (table, batch, tid, txn) -> { Table delTxTable = @@ -357,13 +449,12 @@ private void markBlocksForDeletionSchemaV3( txn); }; - markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, - txnID, schemaV3Marker); + markBlocksForDeletionTransaction(containerData, delTX, + delTX.getTxID(), schemaV3Marker); } private void markBlocksForDeletionSchemaV2( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID) + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { DeletionMarker schemaV2Marker = (table, batch, tid, txn) -> { Table delTxTable = @@ -371,8 +462,8 @@ private void markBlocksForDeletionSchemaV2( delTxTable.putWithBatch(batch, tid, txn); }; - markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, - txnID, schemaV2Marker); + markBlocksForDeletionTransaction(containerData, delTX, + delTX.getTxID(), schemaV2Marker); } /** @@ -385,8 +476,9 @@ private void markBlocksForDeletionSchemaV2( */ private void markBlocksForDeletionTransaction( KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID, DeletionMarker marker) + long txnID, DeletionMarker marker) throws IOException { + int newDeletionBlocks = 0; long containerId = delTX.getContainerID(); logDeleteTransaction(containerId, containerData, delTX); try (DBHandle containerDB = BlockUtils.getDB(containerData, conf)) { @@ -569,4 +661,25 @@ void apply(Table deleteTxnsTable, BatchOperation batch, long txnID, DeletedBlocksTransaction delTX) throws IOException; } + + /** + * The SchemaHandler interface provides a contract for handling + * KeyValueContainerData and DeletedBlocksTransaction based + * on different schema versions. + */ + public interface SchemaHandler { + void handle(KeyValueContainerData containerData, + DeletedBlocksTransaction tx) throws IOException; + } + + @VisibleForTesting + public Map getSchemaHandlers() { + return schemaHandlers; + } + + @VisibleForTesting + public BlockDeletingServiceMetrics getBlockDeleteMetrics() { + return blockDeleteMetrics; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 079da021a969..d091692c0938 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; @@ -728,6 +729,11 @@ public void writeLockInterruptibly() throws InterruptedException { } + public boolean writeLockTryLock(long time, TimeUnit unit) + throws InterruptedException { + return this.lock.writeLock().tryLock(time, unit); + } + /** * Returns containerFile. * @return .container File name diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index fd5b5923d584..eaee7d2cf04a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -51,6 +51,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.DELETE_TRANSACTION_KEY; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION; import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED; @@ -142,6 +144,25 @@ public String getSchemaVersion() { return schemaVersion; } + /** + * Returns schema version or the default value when the + * {@link KeyValueContainerData#schemaVersion} is null. The default value can + * be referred to {@link KeyValueContainerUtil#isSameSchemaVersion}. + * + * @return Schema version as a string. + * @throws UnsupportedOperationException If no valid schema version is found. + */ + public String getSupportedSchemaVersionOrDefault() { + String[] versions = {SCHEMA_V1, SCHEMA_V2, SCHEMA_V3}; + + for (String version : versions) { + if (this.hasSchema(version)) { + return version; + } + } + throw new UnsupportedOperationException("No valid schema version found."); + } + /** * Sets Container dbFile. This should be called only during creation of * KeyValue container. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java new file mode 100644 index 000000000000..62447365421b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.DeleteBlockTransactionExecutionResult; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +/** + * Test cases for TestDeleteBlocksCommandHandler. + */ +@RunWith(Parameterized.class) +public class TestDeleteBlocksCommandHandler { + + @Rule + public Timeout testTimeout = Timeout.seconds(300); + + private OzoneConfiguration conf; + private StateContext context; + private ContainerLayoutVersion layout; + private OzoneContainer ozoneContainer; + private ContainerSet containerSet; + private DeleteBlocksCommandHandler handler; + private final String schemaVersion; + private HddsVolume volume1; + private BlockDeletingServiceMetrics blockDeleteMetrics; + + public TestDeleteBlocksCommandHandler(ContainerTestVersionInfo versionInfo) { + this.layout = versionInfo.getLayout(); + this.schemaVersion = versionInfo.getSchemaVersion(); + conf = new OzoneConfiguration(); + ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf); + } + + @Parameterized.Parameters + public static Iterable parameters() { + return ContainerTestVersionInfo.versionParameters(); + } + + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + context = mock(StateContext.class); + layout = ContainerLayoutVersion.FILE_PER_BLOCK; + ozoneContainer = Mockito.mock(OzoneContainer.class); + DatanodeStateMachine dnsm = Mockito.mock(DatanodeStateMachine.class); + when(dnsm.getDatanodeDetails()) + .thenReturn(randomDatanodeDetails()); + Mockito.when(context.getParent()).thenReturn(dnsm); + + containerSet = new ContainerSet(1000); + volume1 = Mockito.mock(HddsVolume.class); + Mockito.when(volume1.getStorageID()).thenReturn("uuid-1"); + for (int i = 0; i <= 10; i++) { + KeyValueContainerData data = + new KeyValueContainerData(i, + layout, + ContainerTestHelper.CONTAINER_MAX_SIZE, + UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + data.setSchemaVersion(schemaVersion); + data.setVolume(volume1); + KeyValueContainer container = new KeyValueContainer(data, conf); + data.closeContainer(); + containerSet.addContainer(container); + } + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + + handler = spy(new DeleteBlocksCommandHandler( + ozoneContainer, conf, dnConf)); + blockDeleteMetrics = handler.getBlockDeleteMetrics(); + TestSchemaHandler testSchemaHandler1 = Mockito.spy(new TestSchemaHandler()); + TestSchemaHandler testSchemaHandler2 = Mockito.spy(new TestSchemaHandler()); + TestSchemaHandler testSchemaHandler3 = Mockito.spy(new TestSchemaHandler()); + + handler.getSchemaHandlers().put(SCHEMA_V1, testSchemaHandler1); + handler.getSchemaHandlers().put(SCHEMA_V2, testSchemaHandler2); + handler.getSchemaHandlers().put(SCHEMA_V3, testSchemaHandler3); + } + + @After + public void tearDown() { + handler.stop(); + BlockDeletingServiceMetrics.unRegister(); + } + + @Test + public void testDeleteBlocksCommandHandler() + throws IOException { + Assert.assertTrue(containerSet.containerCount() > 0); + Container container = containerSet.getContainerIterator(volume1).next(); + DeletedBlocksTransaction transaction = createDeletedBlocksTransaction(1, + container.getContainerData().getContainerID()); + + List results = + handler.executeCmdWithRetry(Arrays.asList(transaction)); + + String schemaVersionOrDefault = ((KeyValueContainerData) + container.getContainerData()).getSupportedSchemaVersionOrDefault(); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), + times(1)).handle(any(), any()); + // submitTasks will be executed only once, as if there were not retries + Mockito.verify(handler, + times(1)).submitTasks(any()); + + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.get(0).getSuccess()); + Assert.assertEquals(0, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + @Test + public void testDeleteBlocksCommandHandlerWithTimeoutFailed() + throws IOException { + Assert.assertTrue(containerSet.containerCount() >= 2); + Iterator> iterator = + containerSet.getContainerIterator(volume1); + Container lockedContainer = iterator.next(); + Container nonLockedcontainer = iterator.next(); + DeletedBlocksTransaction transaction1 = + createDeletedBlocksTransaction(1, + lockedContainer.getContainerData().getContainerID()); + DeletedBlocksTransaction transaction2 = + createDeletedBlocksTransaction(2, + nonLockedcontainer.getContainerData().getContainerID()); + + // By letting lockedContainer hold the lock and not releasing it, + // lockedContainer's delete command processing will time out + // and retry, but it will still fail eventually + // nonLockedContainer will succeed because it does not hold the lock + lockedContainer.writeLock(); + List transactions = + Arrays.asList(transaction1, transaction2); + List results = + handler.executeCmdWithRetry(transactions); + String schemaVersionOrDefault = ((KeyValueContainerData) nonLockedcontainer. + getContainerData()).getSupportedSchemaVersionOrDefault(); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), + times(1)).handle(eq((KeyValueContainerData) + nonLockedcontainer.getContainerData()), eq(transaction2)); + + // submitTasks will be executed twice, as if there were retries + Mockito.verify(handler, + times(1)).submitTasks(eq(transactions)); + Mockito.verify(handler, + times(1)).submitTasks(eq(Arrays.asList(transaction1))); + Assert.assertEquals(2, results.size()); + + // Only one transaction will succeed + Map resultsMap = new HashMap<>(); + results.forEach(result -> resultsMap.put(result.getTxID(), result)); + Assert.assertFalse(resultsMap.get(transaction1.getTxID()).getSuccess()); + Assert.assertTrue(resultsMap.get(transaction2.getTxID()).getSuccess()); + + Assert.assertEquals(1, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + @Test + public void testDeleteBlocksCommandHandlerSuccessfulAfterFirstTimeout() + throws IOException { + Assert.assertTrue(containerSet.containerCount() > 0); + Container lockedContainer = + containerSet.getContainerIterator(volume1).next(); + DeletedBlocksTransaction transaction = createDeletedBlocksTransaction(1, + lockedContainer.getContainerData().getContainerID()); + // After the Container waits for the lock to time out for the first time, + // release the lock on the Container, so the retry will succeed. + lockedContainer.writeLock(); + doAnswer(new Answer>>() { + @Override + public List> answer( + InvocationOnMock invocationOnMock) throws Throwable { + List> result = + (List>) + invocationOnMock.callRealMethod(); + // Wait for the task to finish executing and then release the lock + DeleteBlockTransactionExecutionResult res = result.get(0).get(); + if (lockedContainer.hasWriteLock()) { + lockedContainer.writeUnlock(); + } + CompletableFuture future = + new CompletableFuture<>(); + future.complete(res); + result.clear(); + result.add(future); + return result; + } + }).when(handler).submitTasks(any()); + + List results = + handler.executeCmdWithRetry(Arrays.asList(transaction)); + + // submitTasks will be executed twice, as if there were retries + String schemaVersionOrDefault = ((KeyValueContainerData) lockedContainer + .getContainerData()).getSupportedSchemaVersionOrDefault(); + Mockito.verify(handler, + times(2)).submitTasks(any()); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), + times(1)).handle(any(), any()); + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.get(0).getSuccess()); + + Assert.assertEquals(0, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + + private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID, + long containerID) { + return DeletedBlocksTransaction.newBuilder() + .setContainerID(containerID) + .setCount(0) + .addLocalID(1L) + .setTxID(txID) + .build(); + } + + private static class TestSchemaHandler implements SchemaHandler { + @Override + public void handle(KeyValueContainerData containerData, + DeletedBlocksTransaction tx) throws IOException { + // doNoting just for Test + } + } +}