From 2f79e4d9f58477975a0eaa588a55ad72031c831b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 11 Dec 2024 12:42:24 -0800 Subject: [PATCH 1/5] HDDS-11860. Improve BufferUtils.writeFully. --- .../ChunkBufferImplWithByteBufferList.java | 5 +--- .../ozone/common/IncrementalChunkBuffer.java | 6 +---- .../ozone/common/utils/BufferUtils.java | 23 +++++++++++++++++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index f9992c9442db..e1f169662f89 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -248,10 +248,7 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - long written = 0; - for (ByteBuffer buf : buffers) { - written += BufferUtils.writeFully(channel, buf); - } + final long written = BufferUtils.writeFully(channel, buffers); findCurrent(); return written; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index 249c67e4dd3e..732af4b68505 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -280,11 +280,7 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - long written = 0; - for (ByteBuffer buf : buffers) { - written += BufferUtils.writeFully(channel, buf); - } - return written; + return BufferUtils.writeFully(channel, buffers); } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 01b2ec0af10a..a5ec93bdafa9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -31,6 +31,7 @@ * Utilities for buffers. */ public final class BufferUtils { + private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {}; /** Utility classes should not be constructed. **/ private BufferUtils() { @@ -147,11 +148,29 @@ public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOE long written = 0; while (bb.remaining() > 0) { int n = ch.write(bb); - if (n <= 0) { - throw new IllegalStateException("no bytes written"); + if (n < 0) { + throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch); } written += n; } return written; } + + public static long writeFully(GatheringByteChannel ch, List buffers) throws IOException { + return BufferUtils.writeFully(ch, buffers.toArray(EMPTY_BYTE_BUFFER_ARRAY)); + } + + public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException { + long written = 0; + for(int i = 0; i < buffers.length; i++) { + while (buffers[i].remaining() > 0) { + final long n = ch.write(buffers, i, buffers.length - i); + if (n < 0) { + throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch); + } + written += n; + } + } + return written; + } } From 3a6316842c883a93e6d1b75aa48e87daa85d9095 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 11 Dec 2024 12:53:19 -0800 Subject: [PATCH 2/5] Fix checkstyle. --- .../java/org/apache/hadoop/ozone/common/utils/BufferUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index a5ec93bdafa9..602594a94900 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -162,7 +162,7 @@ public static long writeFully(GatheringByteChannel ch, List buffers) public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException { long written = 0; - for(int i = 0; i < buffers.length; i++) { + for (int i = 0; i < buffers.length; i++) { while (buffers[i].remaining() > 0) { final long n = ch.write(buffers, i, buffers.length - i); if (n < 0) { From fe6ca6cb17e2e49872c8d8e1d4ab47a6ffaffbfa Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 13 Dec 2024 10:31:15 -0800 Subject: [PATCH 3/5] Fix bugs in MockGatheringChannel. --- .../ozone/common/utils/BufferUtils.java | 13 ++++ .../hdds/utils/MockGatheringChannel.java | 70 ++++++++++++++----- 2 files changed, 65 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 602594a94900..a266c3615b07 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -26,11 +26,15 @@ import java.util.ArrayList; import java.util.List; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utilities for buffers. */ public final class BufferUtils { + public static final Logger LOG = LoggerFactory.getLogger(BufferUtils.class); + private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {}; /** Utility classes should not be constructed. **/ @@ -161,10 +165,19 @@ public static long writeFully(GatheringByteChannel ch, List buffers) } public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException { + if (LOG.isDebugEnabled()) { + for (int i = 0; i < buffers.length; i++) { + LOG.debug("buffer[{}]: remaining={}", i, buffers[i].remaining()); + } + } + long written = 0; for (int i = 0; i < buffers.length; i++) { while (buffers[i].remaining() > 0) { final long n = ch.write(buffers, i, buffers.length - i); + if (LOG.isDebugEnabled()) { + LOG.debug("buffer[{}]: remaining={}, written={}", i, buffers[i].remaining(), n); + } if (n < 0) { throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java index 8f9256cd7787..2b1a465b13af 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hdds.utils; +import org.junit.jupiter.api.Assertions; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ThreadLocalRandom; import static com.google.common.base.Preconditions.checkElementIndex; @@ -45,11 +48,27 @@ public long write(ByteBuffer[] srcs, int offset, int length) checkElementIndex(offset, srcs.length, "offset"); checkElementIndex(offset + length - 1, srcs.length, "offset+length"); - long bytes = 0; - for (ByteBuffer b : srcs) { - bytes += write(b); + long fullLength = 0; + for (int i = offset; i < srcs.length; i++) { + fullLength += srcs[i].remaining(); + } + if (fullLength <= 0) { + return 0; } - return bytes; + + // simulate partial write by setting a random limit + final long partialLength = ThreadLocalRandom.current().nextLong(fullLength + 1); + + long written = 0; + for (int i = offset; i < srcs.length; i++) { + final long n = partialLength - written; // write at most n bytes + + final ByteBuffer src = srcs[i]; + final int remaining = src.remaining(); + final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining - n); + written += adjustedWrite(src, adjustment); + } + return written; } @Override @@ -59,21 +78,36 @@ public long write(ByteBuffer[] srcs) throws IOException { @Override public int write(ByteBuffer src) throws IOException { - // If src has more than 1 byte left, simulate partial write by adjusting limit. - // Remaining 1 byte should be written on next call. - // This helps verify that the caller ensures buffer is written fully. - final int adjustment = 1; - final boolean limitWrite = src.remaining() > adjustment; - if (limitWrite) { - src.limit(src.limit() - adjustment); - } - try { - return delegate.write(src); - } finally { - if (limitWrite) { - src.limit(src.limit() + adjustment); - } + final int remaining = src.remaining(); + if (remaining <= 0) { + return 0; } + // Simulate partial write by setting a random limit. + final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1); + return adjustedWrite(src, adjustment); + } + + private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException { + Assertions.assertTrue(adjustment >= 0); + final int remaining = src.remaining(); + Assertions.assertTrue(adjustment <= remaining); + + final int oldLimit = src.limit(); + final int newLimit = oldLimit - adjustment; + src.limit(newLimit); + Assertions.assertEquals(newLimit, src.limit()); + final int toWrite = remaining - adjustment; + Assertions.assertEquals(toWrite, src.remaining()); + + final int written = delegate.write(src); + Assertions.assertEquals(newLimit, src.limit()); + Assertions.assertEquals(toWrite - written, src.remaining()); + + src.limit(oldLimit); + Assertions.assertEquals(oldLimit, src.limit()); + Assertions.assertEquals(remaining - written, src.remaining()); + + return written; } @Override From 357069d861e27ef97e2ec0ec3c1213782193e0c1 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 13 Dec 2024 13:59:13 -0800 Subject: [PATCH 4/5] Address review comments. --- .../hadoop/hdds/utils/db/CodecBuffer.java | 6 +-- .../hdds/utils/MockGatheringChannel.java | 45 +++++++++++-------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java index 1ac293b301bb..87be912bb53a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java @@ -58,9 +58,9 @@ public class CodecBuffer implements UncheckedAutoCloseable { private static class Factory { private static volatile BiFunction constructor = CodecBuffer::new; - static void set(BiFunction f) { + static void set(BiFunction f, String name) { constructor = f; - LOG.info("Successfully set constructor to " + f); + LOG.info("Successfully set constructor to {}: {}", name, f); } static CodecBuffer newCodecBuffer(ByteBuf buf) { @@ -89,7 +89,7 @@ protected void finalize() { * Note that there is a severe performance penalty for leak detection. */ public static void enableLeakDetection() { - Factory.set(LeakDetector::newCodecBuffer); + Factory.set(LeakDetector::newCodecBuffer, "LeakDetector::newCodecBuffer"); } /** The size of a buffer. */ diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java index 2b1a465b13af..82671c9af210 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdds.utils; -import org.junit.jupiter.api.Assertions; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; @@ -26,6 +24,8 @@ import java.util.concurrent.ThreadLocalRandom; import static com.google.common.base.Preconditions.checkElementIndex; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * {@link GatheringByteChannel} implementation for testing. Delegates @@ -56,17 +56,22 @@ public long write(ByteBuffer[] srcs, int offset, int length) return 0; } - // simulate partial write by setting a random limit + // simulate partial write by setting a random partial length final long partialLength = ThreadLocalRandom.current().nextLong(fullLength + 1); long written = 0; for (int i = offset; i < srcs.length; i++) { - final long n = partialLength - written; // write at most n bytes - - final ByteBuffer src = srcs[i]; - final int remaining = src.remaining(); - final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining - n); - written += adjustedWrite(src, adjustment); + for(final ByteBuffer src = srcs[i]; src.hasRemaining(); ) { + final long n = partialLength - written; // write at most n bytes + assertThat(n).isGreaterThanOrEqualTo(0); + if (n == 0) { + return written; + } + + final int remaining = src.remaining(); + final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining - n); + written += adjustedWrite(src, adjustment); + } } return written; } @@ -82,30 +87,34 @@ public int write(ByteBuffer src) throws IOException { if (remaining <= 0) { return 0; } - // Simulate partial write by setting a random limit. + // Simulate partial write by a random adjustment. final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1); return adjustedWrite(src, adjustment); } + /** Simulate partial write by the given adjustment. */ private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException { - Assertions.assertTrue(adjustment >= 0); + assertThat(adjustment).isGreaterThanOrEqualTo(0); final int remaining = src.remaining(); - Assertions.assertTrue(adjustment <= remaining); + if (remaining <= 0) { + return 0; + } + assertThat(adjustment).isLessThanOrEqualTo(remaining); final int oldLimit = src.limit(); final int newLimit = oldLimit - adjustment; src.limit(newLimit); - Assertions.assertEquals(newLimit, src.limit()); + assertEquals(newLimit, src.limit()); final int toWrite = remaining - adjustment; - Assertions.assertEquals(toWrite, src.remaining()); + assertEquals(toWrite, src.remaining()); final int written = delegate.write(src); - Assertions.assertEquals(newLimit, src.limit()); - Assertions.assertEquals(toWrite - written, src.remaining()); + assertEquals(newLimit, src.limit()); + assertEquals(toWrite - written, src.remaining()); src.limit(oldLimit); - Assertions.assertEquals(oldLimit, src.limit()); - Assertions.assertEquals(remaining - written, src.remaining()); + assertEquals(oldLimit, src.limit()); + assertEquals(remaining - written, src.remaining()); return written; } From f78c6051995780bc417b1359c8a9acb3d4b3cff7 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Sat, 14 Dec 2024 05:47:58 +0100 Subject: [PATCH 5/5] fix checkstyle --- .../java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java index 82671c9af210..83b68512380e 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java @@ -61,7 +61,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) long written = 0; for (int i = offset; i < srcs.length; i++) { - for(final ByteBuffer src = srcs[i]; src.hasRemaining(); ) { + for (final ByteBuffer src = srcs[i]; src.hasRemaining();) { final long n = partialLength - written; // write at most n bytes assertThat(n).isGreaterThanOrEqualTo(0); if (n == 0) {