From 426b3cfa86873b7accae756830fc25d60814c393 Mon Sep 17 00:00:00 2001 From: XiChen <32928346+xichen01@users.noreply.github.com> Date: Fri, 16 Jun 2023 11:34:19 +0800 Subject: [PATCH 1/8] Make DN DeleteBlocksCommandHandler wait for the lock can timeout. --- .../helpers/BlockDeletingServiceMetrics.java | 16 +- .../statemachine/DatanodeConfiguration.java | 22 ++ .../statemachine/DatanodeStateMachine.java | 3 +- .../DeleteBlocksCommandHandler.java | 204 ++++++++++--- .../container/keyvalue/KeyValueContainer.java | 6 + .../keyvalue/KeyValueContainerData.java | 19 ++ .../TestDeleteBlocksCommandHandler.java | 283 ++++++++++++++++++ 7 files changed, 503 insertions(+), 50 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java index da42e66a9e16..2526ce771f76 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java @@ -52,6 +52,10 @@ public final class BlockDeletingServiceMetrics { @Metric(about = "The total number of blocks pending for processing.") private MutableGaugeLong totalPendingBlockCount; + @Metric(about = "The total number of transactions witch was failed " + + "cause by wait Container lock timeout.") + private MutableGaugeLong totalLockTimeoutTransactionCount; + private BlockDeletingServiceMetrics() { } @@ -90,6 +94,10 @@ public void setTotalPendingBlockCount(long count) { this.totalPendingBlockCount.set(count); } + public void incrTotalLockTimeoutTransactionCount() { + totalLockTimeoutTransactionCount.incr(); + } + public long getSuccessCount() { return successCount.value(); } @@ -114,6 +122,10 @@ public long getTotalPendingBlockCount() { return totalPendingBlockCount.value(); } + public long getTotalLockTimeoutTransactionCount() { + return totalLockTimeoutTransactionCount.value(); + } + @Override public String toString() { StringBuffer buffer = new StringBuffer(); @@ -123,7 +135,9 @@ public String toString() { .append("outOfOrderDeleteBlockTransactionCount = " + outOfOrderDeleteBlockTransactionCount.value()).append("\t") .append("totalPendingBlockCount = " - + totalPendingBlockCount.value()).append("\t"); + + totalPendingBlockCount.value()).append("\t") + .append("totalLockTimeoutTransactionCount = " + + totalLockTimeoutTransactionCount.value()).append("\t"); return buffer.toString(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 8bae562bbf20..100f95c4dcdb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -192,6 +192,24 @@ public class DatanodeConfiguration { private long recoveringContainerScrubInterval = Duration.ofMinutes(10).toMillis(); + /** + * Timeout for the thread used to process the delete block command + * to wait for the container lock. + * It takes about 200ms to open a RocksDB with HDD media. + * Set the default value to 100ms, so that after one times retry + * after waiting timeout, the hold time spent waiting for the lock + * is not greater than the time spent operating RocksDB + */ + @Config(key = "block.delete.command.handle.lock.timeout", + defaultValue = "100ms", + type = ConfigType.TIME, + tags = { DATANODE, ConfigTag.DELETION}, + description = "Timeout for the thread used to process the delete" + + " block command to wait for the container lock." + ) + private long blockDeleteCommandHandleLockTimeoutMs = + Duration.ofMillis(100).toMillis(); + public Duration getBlockDeletionInterval() { return Duration.ofMillis(blockDeletionInterval); } @@ -559,6 +577,10 @@ public int getBlockDeleteQueueLimit() { return blockDeleteQueueLimit; } + public long getBlockDeleteCommandHandleLockTimeoutMs() { + return blockDeleteCommandHandleLockTimeoutMs; + } + public void setBlockDeleteQueueLimit(int queueLimit) { this.blockDeleteQueueLimit = queueLimit; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c988b21867cf..0f8b47a579c2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -226,7 +226,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(getContainer(), conf, dnConf.getBlockDeleteThreads(), - dnConf.getBlockDeleteQueueLimit())) + dnConf.getBlockDeleteQueueLimit(), + dnConf.getBlockDeleteCommandHandleLockTimeoutMs())) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor, pullReplicatorWithMetrics, pushReplicatorWithMetrics)) .addHandler(reconstructECContainersCommandHandler) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index ec316c79f1c1..e88342dfd28a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; @@ -38,6 +39,7 @@ .DeletedContainerBlocksSummary; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -58,7 +60,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -91,13 +95,23 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final Daemon handlerThread; private final OzoneContainer ozoneContainer; private final BlockDeletingServiceMetrics blockDeleteMetrics; + private final long tryLockTimeoutMs; + private final Map schemaHandlers; + public DeleteBlocksCommandHandler(OzoneContainer container, - ConfigurationSource conf, int threadPoolSize, int queueLimit) { + ConfigurationSource conf, int threadPoolSize, int queueLimit, + long tryLockTimeoutMs) { this.ozoneContainer = container; this.containerSet = container.getContainerSet(); this.conf = conf; this.blockDeleteMetrics = BlockDeletingServiceMetrics.create(); + this.tryLockTimeoutMs = tryLockTimeoutMs; + schemaHandlers = new HashMap<>(); + schemaHandlers.put(SCHEMA_V1, this::markBlocksForDeletionSchemaV1); + schemaHandlers.put(SCHEMA_V2, this::markBlocksForDeletionSchemaV2); + schemaHandlers.put(SCHEMA_V3, this::markBlocksForDeletionSchemaV3); + this.executor = Executors.newFixedThreadPool( threadPoolSize, new ThreadFactoryBuilder() .setNameFormat("DeleteBlocksCommandHandlerThread-%d").build()); @@ -166,6 +180,27 @@ public SCMConnectionManager getConnectionManager() { } } + /** + * This class represents the result of executing a delete block transaction. + */ + public static final class DeleteBlockTransactionExecutionResult { + private final DeleteBlockTransactionResult result; + private final boolean lockAcquisitionFailed; + public DeleteBlockTransactionExecutionResult( + DeleteBlockTransactionResult result, boolean lockAcquisitionFailed) { + this.result = result; + this.lockAcquisitionFailed = lockAcquisitionFailed; + } + + public DeleteBlockTransactionResult getResult() { + return result; + } + + public boolean isLockAcquisitionFailed() { + return lockAcquisitionFailed; + } + } + /** * Process delete commands. */ @@ -197,7 +232,7 @@ public void run() { * Process one delete transaction. */ public final class ProcessTransactionTask implements - Callable { + Callable { private DeletedBlocksTransaction tx; public ProcessTransactionTask(DeletedBlocksTransaction transaction) { @@ -205,12 +240,12 @@ public ProcessTransactionTask(DeletedBlocksTransaction transaction) { } @Override - public DeleteBlockTransactionResult call() { + public DeleteBlockTransactionExecutionResult call() { DeleteBlockTransactionResult.Builder txResultBuilder = DeleteBlockTransactionResult.newBuilder(); txResultBuilder.setTxID(tx.getTxID()); long containerId = tx.getContainerID(); - int newDeletionBlocks = 0; + boolean lockAcquisitionFailed = false; try { Container cont = containerSet.getContainer(containerId); if (cont == null) { @@ -221,40 +256,43 @@ public DeleteBlockTransactionResult call() { ContainerType containerType = cont.getContainerType(); switch (containerType) { case KeyValueContainer: + KeyValueContainer keyValueContainer = (KeyValueContainer)cont; KeyValueContainerData containerData = (KeyValueContainerData) cont.getContainerData(); - cont.writeLock(); - try { - if (containerData.hasSchema(SCHEMA_V1)) { - markBlocksForDeletionSchemaV1(containerData, tx); - } else if (containerData.hasSchema(SCHEMA_V2)) { - markBlocksForDeletionSchemaV2(containerData, tx, - newDeletionBlocks, tx.getTxID()); - } else if (containerData.hasSchema(SCHEMA_V3)) { - markBlocksForDeletionSchemaV3(containerData, tx, - newDeletionBlocks, tx.getTxID()); - } else { - throw new UnsupportedOperationException( - "Only schema version 1,2,3 are supported."); + if (keyValueContainer. + writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) { + try { + String schemaVersion = containerData.getValidSchemaVersion(); + if (getSchemaHandlers().containsKey(schemaVersion)) { + schemaHandlers.get(schemaVersion).handle(containerData, tx); + } else { + throw new UnsupportedOperationException( + "Only schema version 1,2,3 are supported."); + } + } finally { + cont.writeUnlock(); } - } finally { - cont.writeUnlock(); + txResultBuilder.setContainerID(containerId) + .setSuccess(true); + } else { + lockAcquisitionFailed = true; + txResultBuilder.setContainerID(containerId) + .setSuccess(false); } - txResultBuilder.setContainerID(containerId) - .setSuccess(true); break; default: LOG.error( "Delete Blocks Command Handler is not implemented for " + "containerType {}", containerType); } - } catch (IOException e) { + } catch (IOException | InterruptedException e) { LOG.warn("Failed to delete blocks for container={}, TXID={}", tx.getContainerID(), tx.getTxID(), e); txResultBuilder.setContainerID(containerId) .setSuccess(false); } - return txResultBuilder.build(); + return new DeleteBlockTransactionExecutionResult( + txResultBuilder.build(), lockAcquisitionFailed); } } @@ -277,27 +315,11 @@ private void processCmd(DeleteCmdInfo cmd) { summary.getTxIDSummary(), summary.getNumOfContainers(), summary.getNumOfBlocks()); + List results = + executeCmdWithRetry(containerBlocks); ContainerBlocksDeletionACKProto.Builder resultBuilder = - ContainerBlocksDeletionACKProto.newBuilder(); - List> futures = new ArrayList<>(); - for (int i = 0; i < containerBlocks.size(); i++) { - DeletedBlocksTransaction tx = containerBlocks.get(i); - Future future = - executor.submit(new ProcessTransactionTask(tx)); - futures.add(future); - } - - // Wait for tasks to finish - futures.forEach(f -> { - try { - resultBuilder.addResults(f.get()); - } catch (InterruptedException | ExecutionException e) { - LOG.error("task failed.", e); - Thread.currentThread().interrupt(); - } - }); - + ContainerBlocksDeletionACKProto.newBuilder().addAllResults(results); resultBuilder.setDnId(cmd.getContext().getParent().getDatanodeDetails() .getUuid().toString()); blockDeletionACK = resultBuilder.build(); @@ -329,10 +351,75 @@ private void processCmd(DeleteCmdInfo cmd) { } } + @VisibleForTesting + public List executeCmdWithRetry( + List transactions) { + List results = + new ArrayList<>(transactions.size()); + Map idToTransaction = + new HashMap<>(transactions.size()); + transactions.forEach(tx -> idToTransaction.put(tx.getTxID(), tx)); + List retryTransaction = new ArrayList<>(); + + List> futures = + submitTasks(transactions); + // Wait for tasks to finish + handleTasksResults(futures, result -> { + if (result.isLockAcquisitionFailed()) { + retryTransaction.add(idToTransaction.get(result.getResult().getTxID())); + } else { + results.add(result.getResult()); + } + }); + + idToTransaction.clear(); + // Wait for all tasks to complete before retrying, usually it takes + // some time for all tasks to complete, then the retry may be successful. + // We will only retry once + if (!retryTransaction.isEmpty()) { + futures = submitTasks(retryTransaction); + handleTasksResults(futures, result -> { + if (result.isLockAcquisitionFailed()) { + blockDeleteMetrics.incrTotalLockTimeoutTransactionCount(); + } + results.add(result.getResult()); + }); + } + return results; + } + + @VisibleForTesting + public List> submitTasks( + List deletedBlocksTransactions) { + List> futures = + new ArrayList<>(deletedBlocksTransactions.size()); + + for (DeletedBlocksTransaction tx : deletedBlocksTransactions) { + Future future = + executor.submit(new ProcessTransactionTask(tx)); + futures.add(future); + } + return futures; + } + + public void handleTasksResults( + List> futures, + Consumer handler) { + futures.forEach(f -> { + try { + DeleteBlockTransactionExecutionResult result = f.get(); + handler.accept(result); + } catch (InterruptedException | ExecutionException e) { + LOG.error("task failed.", e); + Thread.currentThread().interrupt(); + } + }); + } + private void markBlocksForDeletionSchemaV3( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID) + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { + int newDeletionBlocks = 0; DeletionMarker schemaV3Marker = (table, batch, tid, txn) -> { Table delTxTable = (Table) table; @@ -341,13 +428,13 @@ private void markBlocksForDeletionSchemaV3( }; markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, - txnID, schemaV3Marker); + delTX.getTxID(), schemaV3Marker); } private void markBlocksForDeletionSchemaV2( - KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID) + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { + int newDeletionBlocks = 0; DeletionMarker schemaV2Marker = (table, batch, tid, txn) -> { Table delTxTable = (Table) table; @@ -355,7 +442,7 @@ private void markBlocksForDeletionSchemaV2( }; markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, - txnID, schemaV2Marker); + delTX.getTxID(), schemaV2Marker); } /** @@ -545,4 +632,25 @@ void apply(Table deleteTxnsTable, BatchOperation batch, long txnID, DeletedBlocksTransaction delTX) throws IOException; } + + /** + * The SchemaHandler interface provides a contract for handling + * KeyValueContainerData and DeletedBlocksTransaction based + * on different schema versions. + */ + public interface SchemaHandler { + void handle(KeyValueContainerData containerData, + DeletedBlocksTransaction tx) throws IOException; + } + + @VisibleForTesting + public Map getSchemaHandlers() { + return schemaHandlers; + } + + @VisibleForTesting + public BlockDeletingServiceMetrics getBlockDeleteMetrics() { + return blockDeleteMetrics; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 1900d28b2923..2f5a0fe312fc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -713,6 +714,11 @@ public void writeLockInterruptibly() throws InterruptedException { } + public boolean writeLockTryLock(long time, TimeUnit unit) + throws InterruptedException { + return this.lock.writeLock().tryLock(time, unit); + } + /** * Returns containerFile. * @return .container File name diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index fd5b5923d584..4056d0b63e41 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -51,6 +51,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.DELETE_TRANSACTION_KEY; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION; import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED; @@ -142,6 +144,23 @@ public String getSchemaVersion() { return schemaVersion; } + /** + * Returns valid schema version or throws exception if not found. + * + * @return Schema version as a string. + * @throws UnsupportedOperationException If no valid schema version is found. + */ + public String getValidSchemaVersion() { + String[] versions = {SCHEMA_V1, SCHEMA_V2, SCHEMA_V3}; + + for (String version : versions) { + if (this.hasSchema(version)) { + return version; + } + } + throw new UnsupportedOperationException("No valid schema version found."); + } + /** * Sets Container dbFile. This should be called only during creation of * KeyValue container. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java new file mode 100644 index 000000000000..2818b4b96932 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; +import static org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.DeleteBlockTransactionExecutionResult; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +/** + * Test cases for TestDeleteBlocksCommandHandler. + */ +@RunWith(Parameterized.class) +@Timeout(300) +public class TestDeleteBlocksCommandHandler { + + private OzoneConfiguration conf; + private StateContext context; + private ContainerLayoutVersion layout; + private OzoneContainer ozoneContainer; + private ContainerSet containerSet; + private DeleteBlocksCommandHandler handler; + private final String schemaVersion; + private HddsVolume volume1; + private BlockDeletingServiceMetrics blockDeleteMetrics; + + + public TestDeleteBlocksCommandHandler(String schemaVersion) { + this.schemaVersion = schemaVersion; + } + + @Parameterized.Parameters(name = "{index}: Schema Version {0}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {SCHEMA_V1}, + {SCHEMA_V2}, + {SCHEMA_V3}, + }); + } + + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + context = mock(StateContext.class); + layout = ContainerLayoutVersion.FILE_PER_BLOCK; + ozoneContainer = Mockito.mock(OzoneContainer.class); + DatanodeStateMachine dnsm = Mockito.mock(DatanodeStateMachine.class); + when(dnsm.getDatanodeDetails()) + .thenReturn(randomDatanodeDetails()); + Mockito.when(context.getParent()).thenReturn(dnsm); + + containerSet = new ContainerSet(1000); + volume1 = Mockito.mock(HddsVolume.class); + Mockito.when(volume1.getStorageID()).thenReturn("uuid-1"); + for (int i = 0; i <= 10; i++) { + KeyValueContainerData data = + new KeyValueContainerData(i, + layout, + ContainerTestHelper.CONTAINER_MAX_SIZE, + UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + data.setSchemaVersion(schemaVersion); + data.setVolume(volume1); + KeyValueContainer container = new KeyValueContainer(data, conf); + data.closeContainer(); + containerSet.addContainer(container); + } + when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + + handler = spy(new DeleteBlocksCommandHandler( + ozoneContainer, conf, 1, 1, 25)); + blockDeleteMetrics = handler.getBlockDeleteMetrics(); + TestSchemaHandler testSchemaHandler1 = Mockito.spy(new TestSchemaHandler()); + TestSchemaHandler testSchemaHandler2 = Mockito.spy(new TestSchemaHandler()); + TestSchemaHandler testSchemaHandler3 = Mockito.spy(new TestSchemaHandler()); + + handler.getSchemaHandlers().put(SCHEMA_V1, testSchemaHandler1); + handler.getSchemaHandlers().put(SCHEMA_V2, testSchemaHandler2); + handler.getSchemaHandlers().put(SCHEMA_V3, testSchemaHandler3); + } + + @After + public void tearDown() { + handler.stop(); + BlockDeletingServiceMetrics.unRegister(); + } + + @Test + public void testDeleteBlocksCommandHandler() + throws IOException { + Assert.assertTrue(containerSet.containerCount() > 0); + Container container = containerSet.getContainerIterator(volume1).next(); + DeletedBlocksTransaction transaction = createDeletedBlocksTransaction(1, + container.getContainerData().getContainerID()); + + List results = + handler.executeCmdWithRetry(Arrays.asList(transaction)); + + Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + times(1)).handle(any(), any()); + // submitTasks will be executed only once, as if there were not retries + Mockito.verify(handler, + times(1)).submitTasks(any()); + + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.get(0).getSuccess()); + Assert.assertEquals(0, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + @Test + @Timeout(60) + public void testDeleteBlocksCommandHandlerWithTimeoutFailed() + throws IOException { + Assert.assertTrue(containerSet.containerCount() >= 2); + Iterator> iterator = + containerSet.getContainerIterator(volume1); + Container lockedContainer = iterator.next(); + Container nonLockedcontainer = iterator.next(); + DeletedBlocksTransaction transaction1 = + createDeletedBlocksTransaction(1, + lockedContainer.getContainerData().getContainerID()); + DeletedBlocksTransaction transaction2 = + createDeletedBlocksTransaction(2, + nonLockedcontainer.getContainerData().getContainerID()); + + // By letting lockedContainer hold the lock and not releasing it, + // lockedContainer's delete command processing will time out + // and retry, but it will still fail eventually + // nonLockedContainer will succeed because it does not hold the lock + lockedContainer.writeLock(); + List transactions = + Arrays.asList(transaction1, transaction2); + List results = + handler.executeCmdWithRetry(transactions); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + times(1)).handle(eq((KeyValueContainerData) + nonLockedcontainer.getContainerData()), eq(transaction2)); + + // submitTasks will be executed twice, as if there were retries + Mockito.verify(handler, + times(1)).submitTasks(eq(transactions)); + Mockito.verify(handler, + times(1)).submitTasks(eq(Arrays.asList(transaction1))); + Assert.assertEquals(2, results.size()); + + // Only one transaction will succeed + Map resultsMap = new HashMap<>(); + results.forEach(result -> resultsMap.put(result.getTxID(), result)); + Assert.assertFalse(resultsMap.get(transaction1.getTxID()).getSuccess()); + Assert.assertTrue(resultsMap.get(transaction2.getTxID()).getSuccess()); + + Assert.assertEquals(1, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + @Test + public void testDeleteBlocksCommandHandlerSuccessfulAfterFirstTimeout() + throws IOException { + Assert.assertTrue(containerSet.containerCount() > 0); + Container lockedContainer = + containerSet.getContainerIterator(volume1).next(); + DeletedBlocksTransaction transaction = createDeletedBlocksTransaction(1, + lockedContainer.getContainerData().getContainerID()); + // After the Container waits for the lock to time out for the first time, + // release the lock on the Container, so the retry will succeed. + lockedContainer.writeLock(); + doAnswer(new Answer>>() { + @Override + public List> answer( + InvocationOnMock invocationOnMock) throws Throwable { + List> result = + (List>) + invocationOnMock.callRealMethod(); + // Wait for the task to finish executing and then release the lock + DeleteBlockTransactionExecutionResult res = result.get(0).get(); + if (lockedContainer.hasWriteLock()) { + lockedContainer.writeUnlock(); + } + CompletableFuture future = + new CompletableFuture<>(); + future.complete(res); + result.clear(); + result.add(future); + return result; + } + }).when(handler).submitTasks(any()); + + List results = + handler.executeCmdWithRetry(Arrays.asList(transaction)); + + // submitTasks will be executed twice, as if there were retries + Mockito.verify(handler, + times(2)).submitTasks(any()); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + times(1)).handle(any(), any()); + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.get(0).getSuccess()); + + Assert.assertEquals(0, + blockDeleteMetrics.getTotalLockTimeoutTransactionCount()); + } + + + private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID, + long containerID) { + return DeletedBlocksTransaction.newBuilder() + .setContainerID(containerID) + .setCount(0) + .addLocalID(1L) + .setTxID(txID) + .build(); + } + + private static class TestSchemaHandler implements SchemaHandler { + @Override + public void handle(KeyValueContainerData containerData, + DeletedBlocksTransaction tx) throws IOException { + // doNoting just for Test + } + } +} From 6a8b74e6cc846d34376d4fb289c2373506ae6f01 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 20 Jun 2023 00:38:12 +0800 Subject: [PATCH 2/8] Typo; Use junit4 timeout; Change DeleteBlocksCommandHandler parameters --- .../common/helpers/BlockDeletingServiceMetrics.java | 2 +- .../common/statemachine/DatanodeStateMachine.java | 4 +--- .../commandhandler/DeleteBlocksCommandHandler.java | 12 ++++++------ .../TestDeleteBlocksCommandHandler.java | 13 +++++++++---- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java index 2526ce771f76..57526041bf4f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java @@ -52,7 +52,7 @@ public final class BlockDeletingServiceMetrics { @Metric(about = "The total number of blocks pending for processing.") private MutableGaugeLong totalPendingBlockCount; - @Metric(about = "The total number of transactions witch was failed " + + @Metric(about = "The total number of transactions which was failed " + "cause by wait Container lock timeout.") private MutableGaugeLong totalLockTimeoutTransactionCount; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 0f8b47a579c2..c9805a1c5269 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -225,9 +225,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(getContainer(), - conf, dnConf.getBlockDeleteThreads(), - dnConf.getBlockDeleteQueueLimit(), - dnConf.getBlockDeleteCommandHandleLockTimeoutMs())) + conf, dnConf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor, pullReplicatorWithMetrics, pushReplicatorWithMetrics)) .addHandler(reconstructECContainersCommandHandler) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index e88342dfd28a..073fd9f59d54 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -39,6 +39,7 @@ .DeletedContainerBlocksSummary; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; @@ -98,24 +99,23 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final long tryLockTimeoutMs; private final Map schemaHandlers; - public DeleteBlocksCommandHandler(OzoneContainer container, - ConfigurationSource conf, int threadPoolSize, int queueLimit, - long tryLockTimeoutMs) { + ConfigurationSource conf, DatanodeConfiguration dnConf) { this.ozoneContainer = container; this.containerSet = container.getContainerSet(); this.conf = conf; this.blockDeleteMetrics = BlockDeletingServiceMetrics.create(); - this.tryLockTimeoutMs = tryLockTimeoutMs; + this.tryLockTimeoutMs = dnConf.getBlockDeleteCommandHandleLockTimeoutMs(); schemaHandlers = new HashMap<>(); schemaHandlers.put(SCHEMA_V1, this::markBlocksForDeletionSchemaV1); schemaHandlers.put(SCHEMA_V2, this::markBlocksForDeletionSchemaV2); schemaHandlers.put(SCHEMA_V3, this::markBlocksForDeletionSchemaV3); this.executor = Executors.newFixedThreadPool( - threadPoolSize, new ThreadFactoryBuilder() + dnConf.getBlockDeleteThreads(), new ThreadFactoryBuilder() .setNameFormat("DeleteBlocksCommandHandlerThread-%d").build()); - this.deleteCommandQueues = new LinkedBlockingQueue<>(queueLimit); + this.deleteCommandQueues = + new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit()); handlerThread = new Daemon(new DeleteCmdWorker()); handlerThread.start(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index 2818b4b96932..62b6223ad04f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -33,8 +34,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.junit.jupiter.api.Timeout; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.Mockito; @@ -72,9 +74,11 @@ * Test cases for TestDeleteBlocksCommandHandler. */ @RunWith(Parameterized.class) -@Timeout(300) public class TestDeleteBlocksCommandHandler { + @Rule + public Timeout testTimeout = Timeout.seconds(300); + private OzoneConfiguration conf; private StateContext context; private ContainerLayoutVersion layout; @@ -127,9 +131,11 @@ public void setup() throws Exception { containerSet.addContainer(container); } when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); handler = spy(new DeleteBlocksCommandHandler( - ozoneContainer, conf, 1, 1, 25)); + ozoneContainer, conf, dnConf)); blockDeleteMetrics = handler.getBlockDeleteMetrics(); TestSchemaHandler testSchemaHandler1 = Mockito.spy(new TestSchemaHandler()); TestSchemaHandler testSchemaHandler2 = Mockito.spy(new TestSchemaHandler()); @@ -170,7 +176,6 @@ public void testDeleteBlocksCommandHandler() } @Test - @Timeout(60) public void testDeleteBlocksCommandHandlerWithTimeoutFailed() throws IOException { Assert.assertTrue(containerSet.containerCount() >= 2); From fd0105f8c1419a912f6942651ed74271d095876c Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 29 Jun 2023 01:46:55 +0800 Subject: [PATCH 3/8] Fix findbugs --- .../commandhandler/TestDeleteBlocksCommandHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index 62b6223ad04f..a9e4fdef4f9a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -77,6 +77,7 @@ public class TestDeleteBlocksCommandHandler { @Rule + @SuppressWarnings("unused") public Timeout testTimeout = Timeout.seconds(300); private OzoneConfiguration conf; From 2b8e4ea5a211ec5004b4a7e060204405545037bb Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 29 Jun 2023 02:17:32 +0800 Subject: [PATCH 4/8] Using ContainerTestVersionInfo to parameterize test --- .../DeleteBlocksCommandHandler.java | 3 +- .../keyvalue/KeyValueContainerData.java | 6 ++-- .../TestDeleteBlocksCommandHandler.java | 32 +++++++++++-------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 49bcbb2986a2..b217393070ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -273,7 +273,8 @@ public DeleteBlockTransactionExecutionResult call() { if (keyValueContainer. writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) { try { - String schemaVersion = containerData.getValidSchemaVersion(); + String schemaVersion = containerData + .getSupportedSchemaVersionOrDefault(); if (getSchemaHandlers().containsKey(schemaVersion)) { schemaHandlers.get(schemaVersion).handle(containerData, tx); } else { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 4056d0b63e41..eaee7d2cf04a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -145,12 +145,14 @@ public String getSchemaVersion() { } /** - * Returns valid schema version or throws exception if not found. + * Returns schema version or the default value when the + * {@link KeyValueContainerData#schemaVersion} is null. The default value can + * be referred to {@link KeyValueContainerUtil#isSameSchemaVersion}. * * @return Schema version as a string. * @throws UnsupportedOperationException If no valid schema version is found. */ - public String getValidSchemaVersion() { + public String getSupportedSchemaVersionOrDefault() { String[] versions = {SCHEMA_V1, SCHEMA_V2, SCHEMA_V3}; for (String version : versions) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index a9e4fdef4f9a..4e77b7b85de4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler; @@ -48,7 +49,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -90,18 +90,16 @@ public class TestDeleteBlocksCommandHandler { private HddsVolume volume1; private BlockDeletingServiceMetrics blockDeleteMetrics; - - public TestDeleteBlocksCommandHandler(String schemaVersion) { - this.schemaVersion = schemaVersion; + public TestDeleteBlocksCommandHandler(ContainerTestVersionInfo versionInfo) { + this.layout = versionInfo.getLayout(); + this.schemaVersion = versionInfo.getSchemaVersion(); + conf = new OzoneConfiguration(); + ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf); } - @Parameterized.Parameters(name = "{index}: Schema Version {0}") - public static Collection data() { - return Arrays.asList(new Object[][]{ - {SCHEMA_V1}, - {SCHEMA_V2}, - {SCHEMA_V3}, - }); + @Parameterized.Parameters + public static Iterable parameters() { + return ContainerTestVersionInfo.versionParameters(); } @Before @@ -164,7 +162,9 @@ public void testDeleteBlocksCommandHandler() List results = handler.executeCmdWithRetry(Arrays.asList(transaction)); - Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + String schemaVersionOrDefault = ((KeyValueContainerData) + container.getContainerData()).getSupportedSchemaVersionOrDefault(); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), times(1)).handle(any(), any()); // submitTasks will be executed only once, as if there were not retries Mockito.verify(handler, @@ -200,7 +200,9 @@ public void testDeleteBlocksCommandHandlerWithTimeoutFailed() Arrays.asList(transaction1, transaction2); List results = handler.executeCmdWithRetry(transactions); - Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + String schemaVersionOrDefault = ((KeyValueContainerData) nonLockedcontainer. + getContainerData()).getSupportedSchemaVersionOrDefault(); + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), times(1)).handle(eq((KeyValueContainerData) nonLockedcontainer.getContainerData()), eq(transaction2)); @@ -257,9 +259,11 @@ public List> answer( handler.executeCmdWithRetry(Arrays.asList(transaction)); // submitTasks will be executed twice, as if there were retries + String schemaVersionOrDefault = ((KeyValueContainerData) lockedContainer + .getContainerData()).getSupportedSchemaVersionOrDefault(); Mockito.verify(handler, times(2)).submitTasks(any()); - Mockito.verify(handler.getSchemaHandlers().get(schemaVersion), + Mockito.verify(handler.getSchemaHandlers().get(schemaVersionOrDefault), times(1)).handle(any(), any()); Assert.assertEquals(1, results.size()); Assert.assertTrue(results.get(0).getSuccess()); From 9fa10cbf0bab7f86655a9c04a303fd56eeff3623 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Thu, 29 Jun 2023 10:41:09 +0800 Subject: [PATCH 5/8] Exclude findbugs URF --- .../container-service/dev-support/findbugsExcludeFile.xml | 4 ++++ .../commandhandler/TestDeleteBlocksCommandHandler.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml index 2cca8b90e22f..be2e2df5f1fe 100644 --- a/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdds/container-service/dev-support/findbugsExcludeFile.xml @@ -65,6 +65,10 @@ + + + + diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index 4e77b7b85de4..9f445158778e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -77,7 +77,6 @@ public class TestDeleteBlocksCommandHandler { @Rule - @SuppressWarnings("unused") public Timeout testTimeout = Timeout.seconds(300); private OzoneConfiguration conf; From c1ddf5dbb3b151f21278494651c8dd94cd30abeb Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 22 Sep 2023 00:51:24 +0800 Subject: [PATCH 6/8] Modify comments; Modify markBlocksForDeletionTransaction --- .../helpers/BlockDeletingServiceMetrics.java | 4 +-- .../statemachine/DatanodeConfiguration.java | 19 +++++++------ .../DeleteBlocksCommandHandler.java | 27 ++++++++++--------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java index 78f97d583166..6f2d78c7e71e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java @@ -74,8 +74,8 @@ public final class BlockDeletingServiceMetrics { @Metric(about = "The total number of Container chosen to be deleted.") private MutableGaugeLong totalContainerChosenCount; - @Metric(about = "The total number of transactions which was failed " + - "cause by wait Container lock timeout.") + @Metric(about = "The total number of transactions which failed due" + + " to container lock wait timeout.") private MutableGaugeLong totalLockTimeoutTransactionCount; private BlockDeletingServiceMetrics() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 71b464db5c78..bf004586926f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -189,21 +189,20 @@ public class DatanodeConfiguration { Duration.ofMinutes(10).toMillis(); /** - * Timeout for the thread used to process the delete block command - * to wait for the container lock. - * It takes about 200ms to open a RocksDB with HDD media. - * Set the default value to 100ms, so that after one times retry - * after waiting timeout, the hold time spent waiting for the lock - * is not greater than the time spent operating RocksDB + * The maximum time to wait for acquiring the container lock when processing + * a delete block transaction. + * If a timeout occurs while attempting to get the lock, the delete block + * transaction won't be immediately discarded. Instead, it will be retried + * after all the current delete block transactions have been processed. */ - @Config(key = "block.delete.command.handle.lock.timeout", + @Config(key = "block.delete.max.lock.wait.timeout", defaultValue = "100ms", type = ConfigType.TIME, tags = { DATANODE, ConfigTag.DELETION}, description = "Timeout for the thread used to process the delete" + " block command to wait for the container lock." ) - private long blockDeleteCommandHandleLockTimeoutMs = + private long blockDeleteMaxLockWaitTimeoutMs = Duration.ofMillis(100).toMillis(); public Duration getBlockDeletionInterval() { @@ -591,8 +590,8 @@ public int getBlockDeleteQueueLimit() { return blockDeleteQueueLimit; } - public long getBlockDeleteCommandHandleLockTimeoutMs() { - return blockDeleteCommandHandleLockTimeoutMs; + public long getBlockDeleteMaxLockWaitTimeoutMs() { + return blockDeleteMaxLockWaitTimeoutMs; } public void setBlockDeleteQueueLimit(int queueLimit) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index b217393070ca..96851a4a2634 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -106,7 +106,7 @@ public DeleteBlocksCommandHandler(OzoneContainer container, this.containerSet = container.getContainerSet(); this.conf = conf; this.blockDeleteMetrics = BlockDeletingServiceMetrics.create(); - this.tryLockTimeoutMs = dnConf.getBlockDeleteCommandHandleLockTimeoutMs(); + this.tryLockTimeoutMs = dnConf.getBlockDeleteMaxLockWaitTimeoutMs(); schemaHandlers = new HashMap<>(); schemaHandlers.put(SCHEMA_V1, this::markBlocksForDeletionSchemaV1); schemaHandlers.put(SCHEMA_V2, this::markBlocksForDeletionSchemaV2); @@ -268,8 +268,8 @@ public DeleteBlockTransactionExecutionResult call() { switch (containerType) { case KeyValueContainer: KeyValueContainer keyValueContainer = (KeyValueContainer)cont; - KeyValueContainerData containerData = (KeyValueContainerData) - cont.getContainerData(); + KeyValueContainerData containerData = + keyValueContainer.getContainerData(); if (keyValueContainer. writeLockTryLock(tryLockTimeoutMs, TimeUnit.MILLISECONDS)) { try { @@ -282,7 +282,7 @@ public DeleteBlockTransactionExecutionResult call() { "Only schema version 1,2,3 are supported."); } } finally { - cont.writeUnlock(); + keyValueContainer.writeUnlock(); } txResultBuilder.setContainerID(containerId) .setSuccess(true); @@ -297,11 +297,15 @@ public DeleteBlockTransactionExecutionResult call() { "Delete Blocks Command Handler is not implemented for " + "containerType {}", containerType); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { LOG.warn("Failed to delete blocks for container={}, TXID={}", tx.getContainerID(), tx.getTxID(), e); - txResultBuilder.setContainerID(containerId) - .setSuccess(false); + txResultBuilder.setContainerID(containerId).setSuccess(false); + } catch (InterruptedException e) { + LOG.warn("InterruptedException while deleting blocks for " + + "container={}, TXID={}", tx.getContainerID(), tx.getTxID(), e); + Thread.currentThread().interrupt(); + txResultBuilder.setContainerID(containerId).setSuccess(false); } return new DeleteBlockTransactionExecutionResult( txResultBuilder.build(), lockAcquisitionFailed); @@ -438,7 +442,6 @@ public void handleTasksResults( private void markBlocksForDeletionSchemaV3( KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { - int newDeletionBlocks = 0; DeletionMarker schemaV3Marker = (table, batch, tid, txn) -> { Table delTxTable = (Table) table; @@ -446,21 +449,20 @@ private void markBlocksForDeletionSchemaV3( txn); }; - markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, + markBlocksForDeletionTransaction(containerData, delTX, delTX.getTxID(), schemaV3Marker); } private void markBlocksForDeletionSchemaV2( KeyValueContainerData containerData, DeletedBlocksTransaction delTX) throws IOException { - int newDeletionBlocks = 0; DeletionMarker schemaV2Marker = (table, batch, tid, txn) -> { Table delTxTable = (Table) table; delTxTable.putWithBatch(batch, tid, txn); }; - markBlocksForDeletionTransaction(containerData, delTX, newDeletionBlocks, + markBlocksForDeletionTransaction(containerData, delTX, delTX.getTxID(), schemaV2Marker); } @@ -474,8 +476,9 @@ private void markBlocksForDeletionSchemaV2( */ private void markBlocksForDeletionTransaction( KeyValueContainerData containerData, DeletedBlocksTransaction delTX, - int newDeletionBlocks, long txnID, DeletionMarker marker) + long txnID, DeletionMarker marker) throws IOException { + int newDeletionBlocks = 0; long containerId = delTX.getContainerID(); logDeleteTransaction(containerId, containerData, delTX); try (DBHandle containerDB = BlockUtils.getDB(containerData, conf)) { From eeb9f8e06432a0adee0d4ac2b27d43a4c5962cc5 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Fri, 22 Sep 2023 10:53:01 +0800 Subject: [PATCH 7/8] Update indent in DeleteBlocksCommandHandler.java --- .../statemachine/commandhandler/DeleteBlocksCommandHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 96851a4a2634..87d35c2da00b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -669,7 +669,7 @@ void apply(Table deleteTxnsTable, */ public interface SchemaHandler { void handle(KeyValueContainerData containerData, - DeletedBlocksTransaction tx) throws IOException; + DeletedBlocksTransaction tx) throws IOException; } @VisibleForTesting From 7ede747f381d69d6bac9332327a43430539e7e67 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Fri, 22 Sep 2023 10:53:39 +0800 Subject: [PATCH 8/8] Update indent in TestDeleteBlocksCommandHandler.java --- .../commandhandler/TestDeleteBlocksCommandHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index 9f445158778e..62447365421b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -285,7 +285,7 @@ private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID, private static class TestSchemaHandler implements SchemaHandler { @Override public void handle(KeyValueContainerData containerData, - DeletedBlocksTransaction tx) throws IOException { + DeletedBlocksTransaction tx) throws IOException { // doNoting just for Test } }