diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index a3fe1c247951..9ac43300f71b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -429,7 +429,7 @@ private void executePutBlock(boolean close, } else { byteBufferList = null; } - flush(); + waitFuturesComplete(); if (close) { dataStreamCloseReply = out.closeAsync(); } @@ -485,8 +485,16 @@ private void executePutBlock(boolean close, @Override public void flush() throws IOException { + if (xceiverClientFactory != null && xceiverClient != null + && !config.isStreamBufferFlushDelay()) { + waitFuturesComplete(); + } + } + + public void waitFuturesComplete() throws IOException { try { CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get(); + futures.clear(); } catch (Exception e) { LOG.warn("Failed to write all chunks through stream: " + e); throw new IOException(e); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 696ab92ab78d..21003374d7a7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -89,7 +89,6 @@ public static void init() throws Exception { blockSize = 2 * maxFlushSize; OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); - clientConfig.setStreamBufferFlushDelay(false); conf.setFromObject(clientConfig); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);