From ee2449f27a0560dd80ab568e64a3375fccf4d77c Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 23 Feb 2024 15:38:37 +0800 Subject: [PATCH 1/7] EC client Reusing thread resources --- .../hdds/scm/storage/AbstractCommitWatcher.java | 2 +- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 5 +---- .../reconstruction/ECReconstructionCoordinator.java | 3 +-- .../hadoop/ozone/client/io/ECKeyOutputStream.java | 11 ++++------- .../org/apache/hadoop/ozone/client/rpc/RpcClient.java | 3 +-- 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 0c5501c7922c..957f761ccbc2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -73,7 +73,7 @@ SortedMap> getCommitIndexMap() { return commitIndexMap; } - void updateCommitInfoMap(long index, List buffers) { + synchronized void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5ff5da60989e..5c0516d7bd4f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -182,8 +181,7 @@ public BlockOutputStream( (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); - // A single thread executor handle the responses of async requests - responseExecutor = Executors.newSingleThreadExecutor(); + this.responseExecutor = blockOutputStreamResourceProvider.get(); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) { bufferList.clear(); } bufferList = null; - responseExecutor.shutdown(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index a45c15844847..90756bbc8898 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used. - private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0; + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; private final ECContainerOperationClient containerOperationClient; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 878558073f75..0cb3973e0411 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -43,8 +43,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; - private final ExecutorService flushExecutor; private final Future flushFuture; private final AtomicLong flushCheckpoint; @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) { this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); - this.flushExecutor = Executors.newSingleThreadExecutor(); S3Auth s3Auth = builder.getS3CredentialsProvider().get(); ThreadLocal s3CredentialsProvider = builder.getS3CredentialsProvider(); - flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); - this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); + this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> { + s3CredentialsProvider.set(s3Auth); + return flushStripeFromQueue(); + }); this.flushCheckpoint = new AtomicLong(0); this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @@ -495,7 +493,6 @@ public void close() throws IOException { } catch (InterruptedException e) { throw new IOException("Flushing thread was interrupted", e); } finally { - flushExecutor.shutdownNow(); closeCurrentStreamEntry(); blockOutputStreamEntryPool.cleanup(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 74b22e7ca4c6..a6830ba9f771 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the writeThreadPool is used. - private static final int WRITE_POOL_MIN_SIZE = 0; + private static final int WRITE_POOL_MIN_SIZE = 1; private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; From abb8ca6ac6f37a6456fd7d9566a4b39eceb7a1a3 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Sat, 24 Feb 2024 02:57:22 +0800 Subject: [PATCH 2/7] fix test --- .../ozone/client/rpc/TestOzoneAtRestEncryption.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index 29cf1bc5e117..44303ed2ff23 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -213,6 +213,14 @@ static void shutdown() throws IOException { } } + static void reInitClient() throws IOException { + ozClient = OzoneClientFactory.getRpcClient(conf); + store = ozClient.getObjectStore(); + TestOzoneRpcClient.setOzClient(ozClient); + TestOzoneRpcClient.setStore(store); + } + + @ParameterizedTest @EnumSource void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception { @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception { KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider(); assertNotEquals(kp3, kpSpy); - // Restore ozClient and store - TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf)); - TestOzoneRpcClient.setStore(ozClient.getObjectStore()); + reInitClient(); } private static RepeatedOmKeyInfo getMatchedKeyInfo( From 17c5d6ec56de1304abab7025303df9e52e12064f Mon Sep 17 00:00:00 2001 From: xichen01 Date: Sun, 3 Mar 2024 03:20:02 +0800 Subject: [PATCH 3/7] Fix Test --- .../hdds/scm/storage/CommitWatcher.java | 27 +++++++++++++++---- .../scm/storage/RatisBlockOutputStream.java | 9 +++---- .../hdds/scm/storage/TestCommitWatcher.java | 13 ++++----- .../rpc/TestOzoneRpcClientAbstract.java | 1 + 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 3c7f8a2360c8..30827ecdd4c6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -24,14 +24,18 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.ozone.common.ChunkBuffer; +import java.util.LinkedList; +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; /** * This class executes watchForCommit on ratis pipeline and releases @@ -42,8 +46,8 @@ class CommitWatcher extends AbstractCommitWatcher { private final BufferPool bufferPool; // future Map to hold up all putBlock futures - private final ConcurrentMap> futureMap = new ConcurrentHashMap<>(); + private final ConcurrentMap>> + futureMap = new ConcurrentHashMap<>(); CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) { super(xceiverClient); @@ -61,17 +65,30 @@ void releaseBuffers(long index) { // When putBlock is called, a future is added. // When putBlock is replied, the future is removed below. // Therefore, the removed future should not be null. - final CompletableFuture removed = + final List> removed = futureMap.remove(totalLength); Objects.requireNonNull(removed, () -> "Future not found for " + totalLength + ": existing = " + futureMap.keySet()); } - ConcurrentMap> getFutureMap() { + @VisibleForTesting + ConcurrentMap>> getFutureMap() { return futureMap; } + public void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + futureMap.computeIfAbsent(flushPos, k -> new LinkedList<>()).add(flushFuture); + } + + + public void waitOnFlushFutures() throws InterruptedException, ExecutionException { + CompletableFuture.allOf( + futureMap.values().stream() + .flatMap(List::stream) + .toArray(CompletableFuture[]::new) + ).get(); + } + @Override public void cleanup() { super.cleanup(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index 6a2758d36486..b587b1d13171 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -113,16 +113,13 @@ void updateCommitInfo(XceiverClientReply reply, List buffers) { } @Override - void putFlushFuture(long flushPos, - CompletableFuture flushFuture) { - commitWatcher.getFutureMap().put(flushPos, flushFuture); + void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + commitWatcher.putFlushFuture(flushPos, flushFuture); } @Override void waitOnFlushFutures() throws InterruptedException, ExecutionException { - // wait for all the transactions to complete - CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray( - new CompletableFuture[0])).get(); + commitWatcher.waitOnFlushFutures(); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java index 2b13daaca291..9450756e8560 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -209,7 +210,7 @@ public void testReleaseBuffers() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } @@ -220,10 +221,10 @@ public void testReleaseBuffers() throws Exception { CompletableFuture future2 = futures.get(1); future1.get(); - assertEquals(future1, watcher.getFutureMap().get((long) chunkSize)); + assertEquals(Collections.singletonList(future1), watcher.getFutureMap().get((long) chunkSize)); // wait on 2nd putBlock to complete future2.get(); - assertEquals(future2, watcher.getFutureMap().get((long) 2 * chunkSize)); + assertEquals(Collections.singletonList(future2), watcher.getFutureMap().get((long) 2 * chunkSize)); assertEquals(2, watcher. getCommitIndexMap().size()); watcher.watchOnFirstIndex(); @@ -282,7 +283,7 @@ public void testReleaseBuffersOnException() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } @@ -293,10 +294,10 @@ public void testReleaseBuffersOnException() throws Exception { CompletableFuture future2 = futures.get(1); future1.get(); - assertEquals(future1, watcher.getFutureMap().get((long) chunkSize)); + assertEquals(Collections.singletonList(future1), watcher.getFutureMap().get((long) chunkSize)); // wait on 2nd putBlock to complete future2.get(); - assertEquals(future2, watcher.getFutureMap().get((long) 2 * chunkSize)); + assertEquals(Collections.singletonList(future2), watcher.getFutureMap().get((long) 2 * chunkSize)); assertEquals(2, watcher.getCommitIndexMap().size()); watcher.watchOnFirstIndex(); assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index da41561f8ce2..cd77706b862c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -1651,6 +1651,7 @@ public void testPutKeyRatisThreeNodesParallel() throws IOException, } latch.countDown(); } catch (IOException ex) { + LOG.error("Execution failed: ", ex); latch.countDown(); failCount.incrementAndGet(); } From 5ab372681f2e1ea169d84c384aa78bca6638b637 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 4 Mar 2024 18:31:49 +0800 Subject: [PATCH 4/7] Fxi testParallelDeleteBucketAndCreateKey --- .../hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index febb6fd41c2a..850cb70004cf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -322,8 +322,9 @@ public void testParallelDeleteBucketAndCreateKey() throws IOException, new OMRequestHandlerPauseInjector(); omSM.getHandler().setInjector(injector); thread1.start(); + Thread.sleep(1000); thread2.start(); - Thread.sleep(2000); + Thread.sleep(10000); injector.resume(); try { From 9d5aaf72ab716281ef480e19cef95223f810d859 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 4 Mar 2024 20:38:51 +0800 Subject: [PATCH 5/7] Remove the changes of testParallelDeleteBucketAndCreateKey --- .../hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 850cb70004cf..febb6fd41c2a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -322,9 +322,8 @@ public void testParallelDeleteBucketAndCreateKey() throws IOException, new OMRequestHandlerPauseInjector(); omSM.getHandler().setInjector(injector); thread1.start(); - Thread.sleep(1000); thread2.start(); - Thread.sleep(10000); + Thread.sleep(2000); injector.resume(); try { From 5cbab9bfb58038acea1b273c53fe48f7adbf3525 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 5 Mar 2024 22:29:49 +0800 Subject: [PATCH 6/7] Use thenCombine to replace List for future management --- .../hadoop/hdds/scm/storage/CommitWatcher.java | 18 +++++++++--------- .../hdds/scm/storage/TestCommitWatcher.java | 9 ++++----- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 30827ecdd4c6..9b5c21676fa8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -46,7 +46,7 @@ class CommitWatcher extends AbstractCommitWatcher { private final BufferPool bufferPool; // future Map to hold up all putBlock futures - private final ConcurrentMap>> + private final ConcurrentMap> futureMap = new ConcurrentHashMap<>(); CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) { @@ -65,28 +65,28 @@ void releaseBuffers(long index) { // When putBlock is called, a future is added. // When putBlock is replied, the future is removed below. // Therefore, the removed future should not be null. - final List> removed = + final CompletableFuture removed = futureMap.remove(totalLength); Objects.requireNonNull(removed, () -> "Future not found for " + totalLength + ": existing = " + futureMap.keySet()); } @VisibleForTesting - ConcurrentMap>> getFutureMap() { + ConcurrentMap> getFutureMap() { return futureMap; } public void putFlushFuture(long flushPos, CompletableFuture flushFuture) { - futureMap.computeIfAbsent(flushPos, k -> new LinkedList<>()).add(flushFuture); + futureMap.compute(flushPos, + (key, previous) -> previous == null ? flushFuture : + previous.thenCombine(flushFuture, (prev, curr) -> curr)); } public void waitOnFlushFutures() throws InterruptedException, ExecutionException { - CompletableFuture.allOf( - futureMap.values().stream() - .flatMap(List::stream) - .toArray(CompletableFuture[]::new) - ).get(); + // wait for all the transactions to complete + CompletableFuture.allOf(futureMap.values().toArray( + new CompletableFuture[0])).get(); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java index 9450756e8560..c3ea911f1935 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -221,10 +220,10 @@ public void testReleaseBuffers() throws Exception { CompletableFuture future2 = futures.get(1); future1.get(); - assertEquals(Collections.singletonList(future1), watcher.getFutureMap().get((long) chunkSize)); + assertEquals(future1, watcher.getFutureMap().get((long) chunkSize)); // wait on 2nd putBlock to complete future2.get(); - assertEquals(Collections.singletonList(future2), watcher.getFutureMap().get((long) 2 * chunkSize)); + assertEquals(future2, watcher.getFutureMap().get((long) 2 * chunkSize)); assertEquals(2, watcher. getCommitIndexMap().size()); watcher.watchOnFirstIndex(); @@ -294,10 +293,10 @@ public void testReleaseBuffersOnException() throws Exception { CompletableFuture future2 = futures.get(1); future1.get(); - assertEquals(Collections.singletonList(future1), watcher.getFutureMap().get((long) chunkSize)); + assertEquals(future1, watcher.getFutureMap().get((long) chunkSize)); // wait on 2nd putBlock to complete future2.get(); - assertEquals(Collections.singletonList(future2), watcher.getFutureMap().get((long) 2 * chunkSize)); + assertEquals(future2, watcher.getFutureMap().get((long) 2 * chunkSize)); assertEquals(2, watcher.getCommitIndexMap().size()); watcher.watchOnFirstIndex(); assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex()); From 8224e4634976717539fc43cb44e3047195678422 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 5 Mar 2024 22:35:48 +0800 Subject: [PATCH 7/7] Checkstyle --- .../java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 9b5c21676fa8..aa339409eceb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -29,8 +29,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.ozone.common.ChunkBuffer; -import java.util.LinkedList; -import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap;