From 946897d831bf03ee15f43bc003b5efbd2a80f3a9 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sat, 4 Dec 2021 01:51:35 +0530 Subject: [PATCH 01/16] HDDS-6039.Define a minimum packet size during streaming writes. --- .../hadoop/hdds/scm/OzoneClientConfig.java | 16 ++++ .../scm/storage/BlockDataStreamOutput.java | 79 ++++++++++++++++--- .../ozone/common/utils/BufferUtils.java | 4 + .../apache/hadoop/ozone/MiniOzoneCluster.java | 7 ++ .../hadoop/ozone/MiniOzoneClusterImpl.java | 6 ++ .../client/rpc/TestBlockDataStreamOutput.java | 29 ++++++- 6 files changed, 130 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 1c55941e3e4b..157a20352874 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -69,6 +69,14 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private long dataStreamBufferFlushSize = 16 * 1024 * 1024; + @Config(key = "datastream.min.packet.size", + defaultValue = "64KB", + type = ConfigType.SIZE, + description = "The maximum size of the ByteBuffer " + + "(used via ratis streaming)", + tags = ConfigTag.CLIENT) + private int dataStreamMinPacketSize = 64 * 1024; + @Config(key = "stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -207,6 +215,14 @@ public void setStreamBufferMaxSize(long streamBufferMaxSize) { this.streamBufferMaxSize = streamBufferMaxSize; } + public int getDataStreamMinPacketSize() { + return dataStreamMinPacketSize; + } + + public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) { + this.dataStreamMinPacketSize = dataStreamMinPacketSize; + } + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 6f5a54354a31..2c7a088b3574 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -36,6 +38,7 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.client.api.DataStreamOutput; @@ -49,7 +52,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -125,6 +130,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private List> futures = new ArrayList<>(); private final long syncSize = 0; // TODO: disk sync is disabled for now private long syncPosition = 0; + // queue to hold packets/buffers < minPacketSize + private Queue smallBuffers; + private XceiverClientMetrics metrics; /** * Creates a new BlockDataStreamOutput. @@ -172,6 +180,8 @@ public BlockDataStreamOutput( ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + smallBuffers = new LinkedList<>(); + metrics = XceiverClientManager.getXceiverClientMetrics(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -260,16 +270,66 @@ public void write(ByteBuffer b, int off, int len) throws IOException { int curLen = len; // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold int maxBufferLen = config.getDataStreamMaxBufferSize(); - while (curLen > 0) { - int writeLen = Math.min(curLen, maxBufferLen); - final StreamBuffer buf = new StreamBuffer(b, off, writeLen); - off += writeLen; - bufferList.add(buf); - writeChunkToContainer(buf.duplicate()); - curLen -= writeLen; - writtenDataLength += writeLen; - doFlushIfNeeded(); + int minPacketSize = config.getDataStreamMinPacketSize(); + int totalSmallBufferLen = getTotalSmallBufferLen(smallBuffers); + // add the buffer to smallBufferQueue if the size is lesser than + // minPacketSize + if (len + totalSmallBufferLen < minPacketSize) { + smallBuffers.add(b); + } else { + ByteBuffer prev = getPreviousBufferIfAny(); + if (!BufferUtils.isEmpty(prev)) { + curLen += prev.position(); + } + while (curLen > 0) { + int writeLen = Math.min(curLen, maxBufferLen); + final StreamBuffer buf; + if (!BufferUtils.isEmpty(prev)) { + prev.rewind(); + ByteBuffer combinedBuffer = + ByteBuffer.allocate(prev.limit() + b.limit()); + combinedBuffer.put(prev).put(b); + combinedBuffer.rewind(); + buf = new StreamBuffer(combinedBuffer, off, writeLen); + } else { + buf = new StreamBuffer(b, off, writeLen); + } + off += writeLen; + bufferList.add(buf); + writeChunkToContainer(buf.duplicate()); + curLen -= writeLen; + writtenDataLength += writeLen; + doFlushIfNeeded(); + } + } + } + + /** + * @param buffers returns total length of all small buffers in the queue + * @return + */ + private int getTotalSmallBufferLen(Queue buffers) { + int total = 0; + for (ByteBuffer b : buffers) { + total += b.limit(); + } + return total; + } + + /** + * @return returns a concatenated byteBuffer for all the buffers + * in the smallBufferQueue + */ + private ByteBuffer getPreviousBufferIfAny() { + int totalLimit = 0; + for (ByteBuffer b : smallBuffers) { + totalLimit += b.limit(); + } + ByteBuffer prev = ByteBuffer.allocate(totalLimit); + for (int i = 0; i < smallBuffers.size(); i++) { + prev.put(smallBuffers.poll()); } + return prev; } private void doFlushIfNeeded() throws IOException { @@ -585,6 +645,7 @@ private void writeChunkToContainer(ByteBuffer buf) .setChecksumData(checksumData.getProtoBufMessage()) .build(); + metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk); if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", chunkInfo.getChunkName(), effectiveChunkSize, offset); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 383d3880733a..9f79f3d2647d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -133,4 +133,8 @@ public static void clearBuffers(ByteBuffer[] byteBuffers) { buffer.clear(); } } + + public static boolean isEmpty(ByteBuffer buf) { + return buf == null || buf.position() == 0; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 94d66d4244c4..143a84ca7a8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -322,6 +322,7 @@ abstract class Builder { protected Optional streamBufferFlushSize = Optional.empty(); protected Optional dataStreamBufferFlushSize= Optional.empty(); protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); + protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional streamBufferMaxSize = Optional.empty(); protected Optional blockSize = Optional.empty(); protected Optional streamBufferSizeUnit = Optional.empty(); @@ -565,6 +566,12 @@ public Builder setDataStreamBufferFlushize(long size) { return this; } + public Builder setDataStreamMinPacketSize(int size) { + dataStreamMinPacketSize = OptionalInt.of(size); + return this; + } + + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index e291d4ecd5aa..d4cdaf0112f3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -664,6 +664,9 @@ protected void initializeConfiguration() throws IOException { if (!dataStreamMaxBufferSize.isPresent()) { dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get()); } + if (!dataStreamMinPacketSize.isPresent()) { + dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4); + } if (!blockSize.isPresent()) { blockSize = Optional.of(2 * streamBufferMaxSize.get()); } @@ -685,6 +688,9 @@ protected void initializeConfiguration() throws IOException { clientConfig.setDataStreamMaxBufferSize((int) Math.round( streamBufferSizeUnit.get() .toBytes(dataStreamMaxBufferSize.getAsInt()))); + clientConfig.setDataStreamMinPacketSize((int) Math.round( + streamBufferSizeUnit.get() + .toBytes(dataStreamMinPacketSize.getAsInt()))); conf.setFromObject(clientConfig); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 5eb38a00de5f..adb4fec96968 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -107,6 +107,7 @@ public static void init() throws Exception { .setDataStreamBufferFlushize(maxFlushSize) .setDataStreamBufferMaxSize(chunkSize) .setStreamBufferSizeUnit(StorageUnit.BYTES) + .setDataStreamMinPacketSize(2*chunkSize/5) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -135,7 +136,7 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { - testWrite(chunkSize / 2); + testWrite(chunkSize / 5); testWriteWithFailure(chunkSize/2); } @@ -165,9 +166,11 @@ private void testWrite(int dataLength) throws Exception { ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); + key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. key.close(); - validateData(keyName, data); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } private void testWriteWithFailure(int dataLength) throws Exception { @@ -230,4 +233,26 @@ private void validateData(String keyName, byte[] data) throws Exception { .validateData(keyName, data, objectStore, volumeName, bucketName); } + @Test public void testMinPacketSize() throws Exception { + String keyName = getKeyName(); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + // minPacketSize= 40, so first write of 20 wont trigger a writeChunk + Assert.assertEquals(writeChunkCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + key.write(ByteBuffer.wrap(data)); + Assert.assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // now close the stream, It will update the key length. + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } } From fb5c4d08a258fab866201467973d11dbb446bd50 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sat, 4 Dec 2021 01:55:12 +0530 Subject: [PATCH 02/16] cleanup --- .../test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java | 2 -- .../hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java | 5 +++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 143a84ca7a8e..196edeac1869 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -570,8 +570,6 @@ public Builder setDataStreamMinPacketSize(int size) { dataStreamMinPacketSize = OptionalInt.of(size); return this; } - - /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index adb4fec96968..54ba444b2a53 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -136,7 +136,7 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { - testWrite(chunkSize / 5); + testWrite(chunkSize / 2); testWriteWithFailure(chunkSize/2); } @@ -233,7 +233,8 @@ private void validateData(String keyName, byte[] data) throws Exception { .validateData(keyName, data, objectStore, volumeName, bucketName); } - @Test public void testMinPacketSize() throws Exception { + @Test + public void testMinPacketSize() throws Exception { String keyName = getKeyName(); XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); From 9c34e32722fcbb6a4e4e40ada4b70680670c67f7 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sat, 4 Dec 2021 12:25:33 +0530 Subject: [PATCH 03/16] trigger new CI check From d74d07d60081334472510ccb79d1b7478452ee1e Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 8 Dec 2021 14:38:29 +0530 Subject: [PATCH 04/16] write fixed sized chunks --- .../scm/storage/BlockDataStreamOutput.java | 92 ++++++------------- .../hadoop/hdds/scm/storage/StreamBuffer.java | 18 +++- .../apache/hadoop/ozone/MiniOzoneCluster.java | 3 +- .../client/rpc/TestBlockDataStreamOutput.java | 6 +- 4 files changed, 50 insertions(+), 69 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 2c7a088b3574..e7979125d24b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -38,7 +38,6 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; -import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.client.api.DataStreamOutput; @@ -52,9 +51,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -130,10 +127,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private List> futures = new ArrayList<>(); private final long syncSize = 0; // TODO: disk sync is disabled for now private long syncPosition = 0; - // queue to hold packets/buffers < minPacketSize - private Queue smallBuffers; + private StreamBuffer currentBuffer; + private int currentBufferRemaining; private XceiverClientMetrics metrics; - /** * Creates a new BlockDataStreamOutput. * @@ -180,8 +176,8 @@ public BlockDataStreamOutput( ioException = new AtomicReference<>(null); checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); - smallBuffers = new LinkedList<>(); metrics = XceiverClientManager.getXceiverClientMetrics(); + allocateNewBufferIfNeeded(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -267,69 +263,32 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } - int curLen = len; - // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold - int maxBufferLen = config.getDataStreamMaxBufferSize(); - int minPacketSize = config.getDataStreamMinPacketSize(); - int totalSmallBufferLen = getTotalSmallBufferLen(smallBuffers); - // add the buffer to smallBufferQueue if the size is lesser than - // minPacketSize - if (len + totalSmallBufferLen < minPacketSize) { - smallBuffers.add(b); - } else { - ByteBuffer prev = getPreviousBufferIfAny(); - if (!BufferUtils.isEmpty(prev)) { - curLen += prev.position(); - } - while (curLen > 0) { - int writeLen = Math.min(curLen, maxBufferLen); - final StreamBuffer buf; - if (!BufferUtils.isEmpty(prev)) { - prev.rewind(); - ByteBuffer combinedBuffer = - ByteBuffer.allocate(prev.limit() + b.limit()); - combinedBuffer.put(prev).put(b); - combinedBuffer.rewind(); - buf = new StreamBuffer(combinedBuffer, off, writeLen); - } else { - buf = new StreamBuffer(b, off, writeLen); - } - off += writeLen; - bufferList.add(buf); - writeChunkToContainer(buf.duplicate()); - curLen -= writeLen; - writtenDataLength += writeLen; - doFlushIfNeeded(); - } + while (len > 0) { + allocateNewBufferIfNeeded(); + int writeLen = Math.min(len, currentBufferRemaining); + final StreamBuffer buf = new StreamBuffer(b, off, writeLen); + currentBuffer.put(buf); + currentBufferRemaining -= writeLen; + writeChunkIfNeeded(); + off += writeLen; + writtenDataLength += writeLen; + len -= writeLen; + doFlushIfNeeded(); } } - /** - * @param buffers returns total length of all small buffers in the queue - * @return - */ - private int getTotalSmallBufferLen(Queue buffers) { - int total = 0; - for (ByteBuffer b : buffers) { - total += b.limit(); + private void writeChunkIfNeeded() throws IOException { + if (currentBufferRemaining==0) { + bufferList.add(currentBuffer); + writeChunkToContainer(currentBuffer.duplicate()); } - return total; } - /** - * @return returns a concatenated byteBuffer for all the buffers - * in the smallBufferQueue - */ - private ByteBuffer getPreviousBufferIfAny() { - int totalLimit = 0; - for (ByteBuffer b : smallBuffers) { - totalLimit += b.limit(); - } - ByteBuffer prev = ByteBuffer.allocate(totalLimit); - for (int i = 0; i < smallBuffers.size(); i++) { - prev.put(smallBuffers.poll()); + private void allocateNewBufferIfNeeded() { + if(currentBufferRemaining==0){ + currentBuffer = StreamBuffer.allocate(config.getDataStreamMinPacketSize()); + currentBufferRemaining = config.getDataStreamMinPacketSize(); } - return prev; } private void doFlushIfNeeded() throws IOException { @@ -509,6 +468,11 @@ private void handleFlush(boolean close) // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer + + if (currentBuffer.hasRemaining()) { + bufferList.add(currentBuffer); + writeChunkToContainer(currentBuffer.duplicate()); + } updateFlushLength(); executePutBlock(close, false); } else if (close) { @@ -644,8 +608,8 @@ private void writeChunkToContainer(ByteBuffer buf) .setLen(effectiveChunkSize) .setChecksumData(checksumData.getProtoBufMessage()) .build(); - metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk); + if (LOG.isDebugEnabled()) { LOG.debug("Writing chunk {} length {} at offset {}", chunkInfo.getChunkName(), effectiveChunkSize, offset); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index f36019e2aeb8..90218c87b834 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -27,7 +27,7 @@ public class StreamBuffer { private final ByteBuffer buffer; public StreamBuffer(ByteBuffer buffer) { - this.buffer = buffer.asReadOnlyBuffer(); + this.buffer = buffer; } public StreamBuffer(ByteBuffer buffer, int offset, int length) { @@ -43,4 +43,20 @@ public int length() { return buffer.limit() - buffer.position(); } + public ByteBuffer getBuffer() { + return buffer; + } + + public void put(StreamBuffer sb){ + buffer.put(sb.getBuffer()); + } + + public static StreamBuffer allocate(int size){ + return new StreamBuffer(ByteBuffer.allocate(size)); + } + + public boolean hasRemaining(){ + return buffer.remaining() > 0; + } + } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 196edeac1869..acea53cbf923 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -322,8 +322,8 @@ abstract class Builder { protected Optional streamBufferFlushSize = Optional.empty(); protected Optional dataStreamBufferFlushSize= Optional.empty(); protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty(); - protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional streamBufferMaxSize = Optional.empty(); + protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty(); protected Optional blockSize = Optional.empty(); protected Optional streamBufferSizeUnit = Optional.empty(); protected boolean includeRecon = false; @@ -570,6 +570,7 @@ public Builder setDataStreamMinPacketSize(int size) { dataStreamMinPacketSize = OptionalInt.of(size); return this; } + /** * Sets the block size for stream buffer. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 54ba444b2a53..4e93067d1c41 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -166,11 +166,9 @@ private void testWrite(int dataLength) throws Exception { ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); - key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. key.close(); - String dataString = new String(data, UTF_8); - validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + validateData(keyName, data); } private void testWriteWithFailure(int dataLength) throws Exception { @@ -233,6 +231,7 @@ private void validateData(String keyName, byte[] data) throws Exception { .validateData(keyName, data, objectStore, volumeName, bucketName); } + @Test public void testMinPacketSize() throws Exception { String keyName = getKeyName(); @@ -256,4 +255,5 @@ public void testMinPacketSize() throws Exception { String dataString = new String(data, UTF_8); validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } + } From d4bedad0e10afe488fd2e087591196ef7b814d13 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 00:57:37 +0530 Subject: [PATCH 05/16] fix few tests --- .../scm/storage/BlockDataStreamOutput.java | 26 +++++++++++-------- .../io/BlockDataStreamOutputEntryPool.java | 2 +- .../client/rpc/TestBlockDataStreamOutput.java | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index e7979125d24b..f2959a9dd104 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -177,7 +177,6 @@ public BlockDataStreamOutput( checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); metrics = XceiverClientManager.getXceiverClientMetrics(); - allocateNewBufferIfNeeded(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -279,11 +278,18 @@ public void write(ByteBuffer b, int off, int len) throws IOException { private void writeChunkIfNeeded() throws IOException { if (currentBufferRemaining==0) { - bufferList.add(currentBuffer); - writeChunkToContainer(currentBuffer.duplicate()); + writeChunk(currentBuffer); } } + private void writeChunk(StreamBuffer sb) throws IOException { + bufferList.add(sb); + ByteBuffer dup = sb.duplicate(); + dup.position(0); + dup.limit(sb.getBuffer().position()); + writeChunkToContainer(dup); + } + private void allocateNewBufferIfNeeded() { if(currentBufferRemaining==0){ currentBuffer = StreamBuffer.allocate(config.getDataStreamMinPacketSize()); @@ -296,7 +302,7 @@ private void doFlushIfNeeded() throws IOException { .getDataStreamMaxBufferSize()); long boundary = config.getDataStreamBufferFlushSize() / config .getDataStreamMaxBufferSize(); - if (bufferList.size() % boundary == 0) { + if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) { updateFlushLength(); executePutBlock(false, false); } @@ -327,11 +333,10 @@ public void writeOnRetry(long len) throws IOException { int count = 0; while (len > 0) { final StreamBuffer buf = bufferList.get(count); - final long writeLen = Math.min(buf.length(), len); + final long writeLen = Math.min(buf.getBuffer().position(), len); final ByteBuffer duplicated = buf.duplicate(); - if (writeLen != buf.length()) { - duplicated.limit(Math.toIntExact(len)); - } + duplicated.position(0); + duplicated.limit(buf.getBuffer().position()); writeChunkToContainer(duplicated); len -= writeLen; count++; @@ -469,9 +474,8 @@ private void handleFlush(boolean close) // here, we just limit this buffer to the current position. So that next // write will happen in new buffer - if (currentBuffer.hasRemaining()) { - bufferList.add(currentBuffer); - writeChunkToContainer(currentBuffer.duplicate()); + if (currentBuffer!=null) { + writeChunk(currentBuffer); } updateFlushLength(); executePutBlock(close, false); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index e49b0b79adf6..c7d8ebde4b5d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -309,7 +309,7 @@ boolean isEmpty() { long computeBufferData() { long totalDataLen =0; for (StreamBuffer b : bufferList){ - totalDataLen += b.length(); + totalDataLen += b.getBuffer().position(); } return totalDataLen; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 4e93067d1c41..f8b15d175923 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -136,7 +136,7 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { - testWrite(chunkSize / 2); + testWrite(chunkSize/2); testWriteWithFailure(chunkSize/2); } From c0b76a91cdb44e8787dccac1c5bde977344c94fe Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 11:52:33 +0530 Subject: [PATCH 06/16] fix TestBlockDataOutputStream --- .../hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 2 ++ .../ozone/client/rpc/TestBlockDataStreamOutput.java | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index f2959a9dd104..8dc63585c92b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -279,6 +279,7 @@ public void write(ByteBuffer b, int off, int len) throws IOException { private void writeChunkIfNeeded() throws IOException { if (currentBufferRemaining==0) { writeChunk(currentBuffer); + currentBuffer = null; } } @@ -476,6 +477,7 @@ private void handleFlush(boolean close) if (currentBuffer!=null) { writeChunk(currentBuffer); + currentBuffer = null; } updateFlushLength(); executePutBlock(close, false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index f8b15d175923..0d774ab03777 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -142,7 +142,6 @@ public void testHalfChunkWrite() throws Exception { @Test public void testSingleChunkWrite() throws Exception { - testWrite(chunkSize); testWriteWithFailure(chunkSize); } @@ -194,7 +193,7 @@ private void testWriteWithFailure(int dataLength) throws Exception { @Test public void testPutBlockAtBoundary() throws Exception { - int dataLength = 500; + int dataLength = 200; XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics(); long putBlockCount = metrics.getContainerOpCountMetrics( @@ -208,12 +207,13 @@ public void testPutBlockAtBoundary() throws Exception { ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); + long a = metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock); Assert.assertTrue( metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); key.close(); - // Since data length is 500 , first putBlock will be at 400(flush boundary) - // and the other at 500 + // Since data length is 200 , first putBlock will be at 160(flush boundary) + // and the other at 200 Assert.assertTrue( metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock) == putBlockCount + 2); From 2d85b61e5d655ddb5b5b834f5c9338f858232381 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 11:58:46 +0530 Subject: [PATCH 07/16] cleanup --- .../java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java | 4 ++-- .../hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 5 +++-- .../org/apache/hadoop/ozone/common/utils/BufferUtils.java | 4 ---- .../hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java | 4 ++-- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 157a20352874..0df37437c746 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -70,12 +70,12 @@ public class OzoneClientConfig { private long dataStreamBufferFlushSize = 16 * 1024 * 1024; @Config(key = "datastream.min.packet.size", - defaultValue = "64KB", + defaultValue = "1MB", type = ConfigType.SIZE, description = "The maximum size of the ByteBuffer " + "(used via ratis streaming)", tags = ConfigTag.CLIENT) - private int dataStreamMinPacketSize = 64 * 1024; + private int dataStreamMinPacketSize = 1024 * 1024; @Config(key = "stream.buffer.increment", defaultValue = "0B", diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 8dc63585c92b..4250a7eeb1ec 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -292,8 +292,9 @@ private void writeChunk(StreamBuffer sb) throws IOException { } private void allocateNewBufferIfNeeded() { - if(currentBufferRemaining==0){ - currentBuffer = StreamBuffer.allocate(config.getDataStreamMinPacketSize()); + if (currentBufferRemaining == 0) { + currentBuffer = + StreamBuffer.allocate(config.getDataStreamMinPacketSize()); currentBufferRemaining = config.getDataStreamMinPacketSize(); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 9f79f3d2647d..383d3880733a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -133,8 +133,4 @@ public static void clearBuffers(ByteBuffer[] byteBuffers) { buffer.clear(); } } - - public static boolean isEmpty(ByteBuffer buf) { - return buf == null || buf.position() == 0; - } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index 0d774ab03777..c9242df8b1a4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -136,12 +136,13 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { - testWrite(chunkSize/2); + testWrite(chunkSize / 2); testWriteWithFailure(chunkSize/2); } @Test public void testSingleChunkWrite() throws Exception { + testWrite(chunkSize); testWriteWithFailure(chunkSize); } @@ -207,7 +208,6 @@ public void testPutBlockAtBoundary() throws Exception { ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); key.write(ByteBuffer.wrap(data)); - long a = metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock); Assert.assertTrue( metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock) <= pendingPutBlockCount + 1); From 494d9cbc86f9750eff4c4e214b9fae9be0cac65c Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 13:36:54 +0530 Subject: [PATCH 08/16] trigger new CI check From 2383a2888bc4580d755ab4df54d94952f772a8fd Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 21:10:02 +0530 Subject: [PATCH 09/16] fix test --- .../hadoop/ozone/client/rpc/TestContainerStateMachineStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java index 3b174503767a..f4c756bccd14 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java @@ -119,6 +119,7 @@ public void setup() throws Exception { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000); From f791369e3a97c2afd554d2c9be78f266aec17a35 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 9 Dec 2021 22:25:25 +0530 Subject: [PATCH 10/16] trigger new CI check From 30388bcd823378ba5ab44029f9c3df0bc9c0ff7b Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 10 Dec 2021 01:10:04 +0530 Subject: [PATCH 11/16] trigger new CI check From 0aab6696d02806d351a9dd0db448c4a448c0830a Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 10 Dec 2021 11:45:31 +0530 Subject: [PATCH 12/16] trigger new CI check From 2feac650831d683b6593106a2001f49712de8487 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 10 Dec 2021 14:20:20 +0530 Subject: [PATCH 13/16] fix test --- .../hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 222e352154eb..89d7b50d49ac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -191,6 +191,7 @@ static void startCluster(OzoneConfiguration conf) throws Exception { .setTotalPipelineNumLimit(10) .setScmId(scmId) .setClusterId(clusterId) + .setDataStreamMinPacketSize(1024) .build(); cluster.waitForClusterToBeReady(); ozClient = OzoneClientFactory.getRpcClient(conf); From 3623c99904f823b87755fcc93e76fd7f2723429d Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 10 Dec 2021 20:32:39 +0530 Subject: [PATCH 14/16] address comments --- .../hdds/scm/storage/BlockDataStreamOutput.java | 15 ++++++--------- .../hadoop/hdds/scm/storage/StreamBuffer.java | 11 ++++------- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 2 +- .../common/src/main/resources/ozone-default.xml | 2 +- .../client/io/BlockDataStreamOutputEntryPool.java | 2 +- 5 files changed, 13 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 4250a7eeb1ec..9fb1340527a3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -128,7 +128,6 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private final long syncSize = 0; // TODO: disk sync is disabled for now private long syncPosition = 0; private StreamBuffer currentBuffer; - private int currentBufferRemaining; private XceiverClientMetrics metrics; /** * Creates a new BlockDataStreamOutput. @@ -264,10 +263,9 @@ public void write(ByteBuffer b, int off, int len) throws IOException { } while (len > 0) { allocateNewBufferIfNeeded(); - int writeLen = Math.min(len, currentBufferRemaining); + int writeLen = Math.min(len, currentBuffer.length()); final StreamBuffer buf = new StreamBuffer(b, off, writeLen); currentBuffer.put(buf); - currentBufferRemaining -= writeLen; writeChunkIfNeeded(); off += writeLen; writtenDataLength += writeLen; @@ -277,7 +275,7 @@ public void write(ByteBuffer b, int off, int len) throws IOException { } private void writeChunkIfNeeded() throws IOException { - if (currentBufferRemaining==0) { + if (currentBuffer.length()==0) { writeChunk(currentBuffer); currentBuffer = null; } @@ -287,15 +285,14 @@ private void writeChunk(StreamBuffer sb) throws IOException { bufferList.add(sb); ByteBuffer dup = sb.duplicate(); dup.position(0); - dup.limit(sb.getBuffer().position()); + dup.limit(sb.position()); writeChunkToContainer(dup); } private void allocateNewBufferIfNeeded() { - if (currentBufferRemaining == 0) { + if (currentBuffer==null) { currentBuffer = StreamBuffer.allocate(config.getDataStreamMinPacketSize()); - currentBufferRemaining = config.getDataStreamMinPacketSize(); } } @@ -335,10 +332,10 @@ public void writeOnRetry(long len) throws IOException { int count = 0; while (len > 0) { final StreamBuffer buf = bufferList.get(count); - final long writeLen = Math.min(buf.getBuffer().position(), len); + final long writeLen = Math.min(buf.position(), len); final ByteBuffer duplicated = buf.duplicate(); duplicated.position(0); - duplicated.limit(buf.getBuffer().position()); + duplicated.limit(buf.position()); writeChunkToContainer(duplicated); len -= writeLen; count++; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index 90218c87b834..5118ea5ead3a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -43,20 +43,17 @@ public int length() { return buffer.limit() - buffer.position(); } - public ByteBuffer getBuffer() { - return buffer; + public int position() { + return buffer.position(); } + public void put(StreamBuffer sb){ - buffer.put(sb.getBuffer()); + buffer.put(sb.buffer); } public static StreamBuffer allocate(int size){ return new StreamBuffer(ByteBuffer.allocate(size)); } - public boolean hasRemaining(){ - return buffer.remaining() > 0; - } - } \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8d694582939c..233725807389 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -102,7 +102,7 @@ public final class OzoneConfigKeys { */ public static final String OZONE_FS_DATASTREAM_ENABLE = "ozone.fs.datastream.enable"; - public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false; + public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = true; /** * When set to true, allocate a random free port for ozone container, so that diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 38a9e5f605a2..678c08e945e0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3010,7 +3010,7 @@ ozone.fs.datastream.enable - false + true OZONE, DATANODE To enable/disable filesystem write via ratis streaming. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index c7d8ebde4b5d..24a046f62395 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -309,7 +309,7 @@ boolean isEmpty() { long computeBufferData() { long totalDataLen =0; for (StreamBuffer b : bufferList){ - totalDataLen += b.getBuffer().position(); + totalDataLen += b.position(); } return totalDataLen; } From 5f7951ac5eb99abff254c57d1fd72b067b903b4a Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 13 Dec 2021 10:42:36 +0530 Subject: [PATCH 15/16] fix commitwatcher logic --- .../org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 3a59d0757105..9ae604e95111 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -178,7 +178,7 @@ private long releaseBuffers(List indexes) { Preconditions.checkState(commitIndexMap.containsKey(index)); final List buffers = commitIndexMap.remove(index); final long length = - buffers.stream().mapToLong(StreamBuffer::length).sum(); + buffers.stream().mapToLong(StreamBuffer::position).sum(); totalAckDataLength += length; // clear the future object from the future Map final CompletableFuture remove = From 93afa56d22f1d2e28f5e4a6cb72721d7a2778f59 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 20 Dec 2021 13:41:31 +0530 Subject: [PATCH 16/16] change default of ozone.fs.streaming.enable --- .../src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 2 +- hadoop-hdds/common/src/main/resources/ozone-default.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 233725807389..8d694582939c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -102,7 +102,7 @@ public final class OzoneConfigKeys { */ public static final String OZONE_FS_DATASTREAM_ENABLE = "ozone.fs.datastream.enable"; - public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = true; + public static final boolean OZONE_FS_DATASTREAM_ENABLE_DEFAULT = false; /** * When set to true, allocate a random free port for ozone container, so that diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 678c08e945e0..38a9e5f605a2 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3010,7 +3010,7 @@ ozone.fs.datastream.enable - true + false OZONE, DATANODE To enable/disable filesystem write via ratis streaming.