From acadf90f8ed0ae4fe7f712506786e883f658d4eb Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Mon, 19 Aug 2024 22:25:47 +0800 Subject: [PATCH 1/2] HDDS-11340. Avoid extra PubBlock call when a full block is closed --- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 14 +++++++++++--- .../hdds/scm/storage/ECBlockOutputStream.java | 2 +- .../hdds/scm/storage/RatisBlockOutputStream.java | 3 ++- .../storage/TestBlockOutputStreamCorrectness.java | 1 + .../ozone/client/io/BlockOutputStreamEntry.java | 2 +- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index aab70a692e30..5d210e331004 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -94,6 +94,9 @@ public class BlockOutputStream extends OutputStream { KeyValue.newBuilder().setKey(FULL_CHUNK).build(); private AtomicReference blockID; + // planned block full size + private long blockSize; + private boolean eofSent = false; private final AtomicReference previousChunkInfo = new AtomicReference<>(); @@ -164,6 +167,7 @@ public class BlockOutputStream extends OutputStream { @SuppressWarnings("checkstyle:ParameterNumber") public BlockOutputStream( BlockID blockID, + long blockSize, XceiverClientFactory xceiverClientManager, Pipeline pipeline, BufferPool bufferPool, @@ -175,6 +179,7 @@ public BlockOutputStream( this.xceiverClientFactory = xceiverClientManager; this.config = config; this.blockID = new AtomicReference<>(blockID); + this.blockSize = blockSize; replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -530,7 +535,7 @@ CompletableFuture executePutBlock(boolean close, final XceiverClientReply asyncReply; try { BlockData blockData = containerBlockData.build(); - LOG.debug("sending PutBlock {}", blockData); + LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos); if (config.getIncrementalChunkList()) { // remove any chunks in the containerBlockData list. @@ -538,7 +543,9 @@ CompletableFuture executePutBlock(boolean close, containerBlockData.clearChunks(); } - asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString); + // if block is full, send the eof + boolean isBlockFull = (blockSize != -1 && flushPos == blockSize); + asyncReply = putBlockAsync(xceiverClient, blockData, close || isBlockFull, tokenString); CompletableFuture future = asyncReply.getResponse(); flushFuture = future.thenApplyAsync(e -> { try { @@ -550,6 +557,7 @@ CompletableFuture executePutBlock(boolean close, if (getIoException() == null && !force) { handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(), asyncReply, flushPos, byteBufferList); + eofSent = close || isBlockFull; } return e; }, responseExecutor).exceptionally(e -> { @@ -690,7 +698,7 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo // There're no pending written data, but there're uncommitted data. updatePutBlockLength(); putBlockResultFuture = executePutBlock(close, false); - } else if (close) { + } else if (close && !eofSent) { // forcing an "empty" putBlock if stream is being closed without new // data since latest flush - we need to send the "EOF" flag updatePutBlockLength(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index bbb3f30687af..12ca9978c685 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -80,7 +80,7 @@ public ECBlockOutputStream( ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, Supplier executorServiceSupplier ) throws IOException { - super(blockID, xceiverClientManager, + super(blockID, -1, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index c0e99a5b4a08..d32c37eba6c3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream @SuppressWarnings("checkstyle:ParameterNumber") public RatisBlockOutputStream( BlockID blockID, + long blockSize, XceiverClientFactory xceiverClientManager, Pipeline pipeline, BufferPool bufferPool, @@ -80,7 +81,7 @@ public RatisBlockOutputStream( ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, Supplier blockOutputStreamResourceProvider ) throws IOException { - super(blockID, xceiverClientManager, pipeline, + super(blockID, blockSize, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index df4d1cb3f8aa..d3425b7d2b0f 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -174,6 +174,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) return new RatisBlockOutputStream( new BlockID(1L, 1L), + -1, xcm, pipeline, bufferPool, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 18a9231c66f7..5e6ecceefa1e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -108,7 +108,7 @@ void checkStream() throws IOException { * @throws IOException */ void createOutputStream() throws IOException { - outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, + outputStream = new RatisBlockOutputStream(blockID, length, xceiverClientManager, pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); } From a01cd1727da737b963b1e0d0024b867f6e17b6b8 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Tue, 20 Aug 2024 12:11:01 +0800 Subject: [PATCH 2/2] fix findbug reported issue --- .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5d210e331004..220e5481cbd3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -96,7 +97,7 @@ public class BlockOutputStream extends OutputStream { private AtomicReference blockID; // planned block full size private long blockSize; - private boolean eofSent = false; + private AtomicBoolean eofSent = new AtomicBoolean(false); private final AtomicReference previousChunkInfo = new AtomicReference<>(); @@ -557,7 +558,7 @@ CompletableFuture executePutBlock(boolean close, if (getIoException() == null && !force) { handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(), asyncReply, flushPos, byteBufferList); - eofSent = close || isBlockFull; + eofSent.set(close || isBlockFull); } return e; }, responseExecutor).exceptionally(e -> { @@ -698,7 +699,7 @@ private synchronized CompletableFuture handleFlushInternalSynchronized(boo // There're no pending written data, but there're uncommitted data. updatePutBlockLength(); putBlockResultFuture = executePutBlock(close, false); - } else if (close && !eofSent) { + } else if (close && !eofSent.get()) { // forcing an "empty" putBlock if stream is being closed without new // data since latest flush - we need to send the "EOF" flag updatePutBlockLength();