diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 957f761ccbc2..0c5501c7922c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -73,7 +73,7 @@ SortedMap> getCommitIndexMap() { return commitIndexMap; } - synchronized void updateCommitInfoMap(long index, List buffers) { + void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5c0516d7bd4f..5ff5da60989e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -181,7 +182,8 @@ public BlockOutputStream( (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); - this.responseExecutor = blockOutputStreamResourceProvider.get(); + // A single thread executor handle the responses of async requests + responseExecutor = Executors.newSingleThreadExecutor(); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -655,6 +657,7 @@ public void cleanup(boolean invalidateClient) { bufferList.clear(); } bufferList = null; + responseExecutor.shutdown(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 90756bbc8898..a45c15844847 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -101,7 +101,8 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; + // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used. + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0; private final ECContainerOperationClient containerOperationClient; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 0cb3973e0411..878558073f75 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -43,6 +43,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -64,6 +66,7 @@ public final class ECKeyOutputStream extends KeyOutputStream private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; + private final ExecutorService flushExecutor; private final Future flushFuture; private final AtomicLong flushCheckpoint; @@ -116,13 +119,12 @@ private ECKeyOutputStream(Builder builder) { this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); + this.flushExecutor = Executors.newSingleThreadExecutor(); S3Auth s3Auth = builder.getS3CredentialsProvider().get(); ThreadLocal s3CredentialsProvider = builder.getS3CredentialsProvider(); - this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> { - s3CredentialsProvider.set(s3Auth); - return flushStripeFromQueue(); - }); + flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); + this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); this.flushCheckpoint = new AtomicLong(0); this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @@ -493,6 +495,7 @@ public void close() throws IOException { } catch (InterruptedException e) { throw new IOException("Flushing thread was interrupted", e); } finally { + flushExecutor.shutdownNow(); closeCurrentStreamEntry(); blockOutputStreamEntryPool.cleanup(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index a6830ba9f771..74b22e7ca4c6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -196,7 +196,8 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - private static final int WRITE_POOL_MIN_SIZE = 1; + // TODO: Adjusts to the appropriate value when the writeThreadPool is used. + private static final int WRITE_POOL_MIN_SIZE = 0; private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index 44303ed2ff23..29cf1bc5e117 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -213,14 +213,6 @@ static void shutdown() throws IOException { } } - static void reInitClient() throws IOException { - ozClient = OzoneClientFactory.getRpcClient(conf); - store = ozClient.getObjectStore(); - TestOzoneRpcClient.setOzClient(ozClient); - TestOzoneRpcClient.setStore(store); - } - - @ParameterizedTest @EnumSource void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception { @@ -778,7 +770,9 @@ void testGetKeyProvider() throws Exception { KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider(); assertNotEquals(kp3, kpSpy); - reInitClient(); + // Restore ozClient and store + TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf)); + TestOzoneRpcClient.setStore(ozClient.getObjectStore()); } private static RepeatedOmKeyInfo getMatchedKeyInfo(