From d2eef17caa4ecd41b1e04e6003f5a4138b3a723a Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 5 Aug 2024 12:43:12 +0530 Subject: [PATCH 1/3] HDDS-11280. Remove flaky test testWriteMoreThanFlushSize for piggybacking. --- .../ozone/client/rpc/TestBlockOutputStream.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 2a6b2246b9c7..ae0e6703a00c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -55,6 +55,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk; @@ -476,9 +477,9 @@ void testWriteMoreThanChunkSize(boolean flushDelay, boolean enablePiggybacking) } @ParameterizedTest - @MethodSource("clientParameters") - void testWriteMoreThanFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); + @ValueSource(booleans = {true, false}) + void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, false); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -550,9 +551,9 @@ void testWriteMoreThanFlushSize(boolean flushDelay, boolean enablePiggybacking) assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(WriteChunk)); // If the flushDelay was disabled, it sends PutBlock with the data in the buffer. - assertEquals(putBlockCount + (flushDelay ? 2 : 3) - (enablePiggybacking ? 1 : 0), + assertEquals(putBlockCount + (flushDelay ? 2 : 3), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + (flushDelay ? 5 : 6) - (enablePiggybacking ? 1 : 0), + assertEquals(totalOpCount + (flushDelay ? 5 : 6), metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty From 481be72bcdc382877be0f581d995fdbf15a7a494 Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 5 Aug 2024 13:28:11 +0530 Subject: [PATCH 2/3] HDDS-11280. Add synchronize in addAckDataLength, and revert to original test --- .../hdds/scm/storage/AbstractCommitWatcher.java | 2 +- .../ozone/client/rpc/TestBlockOutputStream.java | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 2bc73ce58f99..213d2aa16a2e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -83,7 +83,7 @@ synchronized long getTotalAckDataLength() { return totalAckDataLength; } - long addAckDataLength(long acked) { + synchronized long addAckDataLength(long acked) { totalAckDataLength += acked; return totalAckDataLength; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index ae0e6703a00c..2a6b2246b9c7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -55,7 +55,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk; @@ -477,9 +476,9 @@ void testWriteMoreThanChunkSize(boolean flushDelay, boolean enablePiggybacking) } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { - OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, false); + @MethodSource("clientParameters") + void testWriteMoreThanFlushSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); try (OzoneClient client = newClient(cluster.getConf(), config)) { XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); @@ -551,9 +550,9 @@ void testWriteMoreThanFlushSize(boolean flushDelay) throws Exception { assertEquals(writeChunkCount + 3, metrics.getContainerOpCountMetrics(WriteChunk)); // If the flushDelay was disabled, it sends PutBlock with the data in the buffer. - assertEquals(putBlockCount + (flushDelay ? 2 : 3), + assertEquals(putBlockCount + (flushDelay ? 2 : 3) - (enablePiggybacking ? 1 : 0), metrics.getContainerOpCountMetrics(PutBlock)); - assertEquals(totalOpCount + (flushDelay ? 5 : 6), + assertEquals(totalOpCount + (flushDelay ? 5 : 6) - (enablePiggybacking ? 1 : 0), metrics.getTotalOpCount()); assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // make sure the bufferPool is empty From 7f88cf8f9966e1efedbede0699cc3d3a6a460942 Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 5 Aug 2024 22:34:39 +0530 Subject: [PATCH 3/3] Use AtomicLong --- .../hdds/scm/storage/AbstractCommitWatcher.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 213d2aa16a2e..fb489d0d0c6c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; /** * This class executes watchForCommit on ratis pipeline and releases @@ -62,7 +63,7 @@ abstract class AbstractCommitWatcher { private final XceiverClientSpi client; - private long totalAckDataLength; + private final AtomicLong totalAckDataLength = new AtomicLong(); AbstractCommitWatcher(XceiverClientSpi client) { this.client = client; @@ -79,13 +80,12 @@ synchronized void updateCommitInfoMap(long index, List buffers) { } /** @return the total data which has been acknowledged. */ - synchronized long getTotalAckDataLength() { - return totalAckDataLength; + long getTotalAckDataLength() { + return totalAckDataLength.get(); } - synchronized long addAckDataLength(long acked) { - totalAckDataLength += acked; - return totalAckDataLength; + long addAckDataLength(long acked) { + return totalAckDataLength.addAndGet(acked); } /**