From 733cbb378b71f707612600949e0a52ee14186d49 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sat, 2 Oct 2021 06:01:38 +0530 Subject: [PATCH 1/8] HDDS-5674.[Ozone-Streaming] Handle client retries on exception (cherry picked from commit 2dc98b9839f22625ec78f3905b941cb444c82024) --- .../scm/storage/BlockDataStreamOutput.java | 79 +++++++++++++++-- .../hdds/scm/storage/StreamCommitWatcher.java | 84 +++++++++++++++++-- .../client/io/BlockDataStreamOutputEntry.java | 32 ++++++- .../io/BlockDataStreamOutputEntryPool.java | 14 +++- .../ozone/client/io/KeyDataStreamOutput.java | 12 ++- .../client/rpc/TestBlockDataStreamOutput.java | 30 +++++++ .../hadoop/ozone/container/TestHelper.java | 20 +++++ 7 files changed, 248 insertions(+), 23 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 2ae0ba75252e..bb460e2e2738 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -92,6 +92,19 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); + + // Similar to 'BufferPool' but this list maintains only references + // to the ByteBuffers. + private List bufferPool; + + // List containing buffers for which the putBlock call will + // update the length in the datanodes. This list will just maintain + // references to the buffers in the BufferPool which will be cleared + // when the watchForCommit acknowledges a putBlock logIndex has been + // committed on all datanodes. This list will be a place holder for buffers + // which got written between successive putBlock calls. + private List bufferList; + // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next // request will fail upfront. @@ -133,7 +146,8 @@ public BlockDataStreamOutput( XceiverClientFactory xceiverClientManager, Pipeline pipeline, OzoneClientConfig config, - Token token + Token token, + List bufferPool ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -148,7 +162,7 @@ public BlockDataStreamOutput( // Alternatively, stream setup can be delayed till the first chunk write. this.out = setupStream(pipeline); this.token = token; - + this.bufferPool = bufferPool; flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -159,7 +173,7 @@ public BlockDataStreamOutput( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new StreamCommitWatcher(xceiverClient); + commitWatcher = new StreamCommitWatcher(xceiverClient, bufferPool); totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = new ArrayList<>(0); @@ -242,6 +256,11 @@ public IOException getIoException() { return ioException.get(); } + + public List getBufferPool() { + return bufferPool; + } + @Override public void write(ByteBuffer b, int off, int len) throws IOException { checkOpen(); @@ -251,8 +270,11 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } - writeChunkToContainer( - (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len)); + ByteBuffer buf = + (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len); + bufferPool.add(buf); + + writeChunkToContainer(buf); writtenDataLength += len; } @@ -261,6 +283,10 @@ private void updateFlushLength() { totalDataFlushedLength = writtenDataLength; } + @VisibleForTesting + public long getTotalDataFlushedLength() { + return totalDataFlushedLength; + } /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -268,8 +294,23 @@ private void updateFlushLength() { * @throws IOException if error occurred */ - // TODO: We need add new retry policy without depend on bufferPool. public void writeOnRetry(long len) throws IOException { + if (len == 0) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrying write length {} for blockID {}", len, blockID); + } + int count = 0; + while (len > 0) { + ByteBuffer buf = bufferPool.get(count); + long writeLen = Math.min(buf.limit() - buf.position(), len); + writeChunkToContainer(buf); + len -= writeLen; + count++; + writtenDataLength += writeLen; + } + } @@ -314,6 +355,15 @@ ContainerCommandResponseProto> executePutBlock(boolean close, boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; + final List byteBufferList; + if (!force) { + Preconditions.checkNotNull(bufferList); + byteBufferList = bufferList; + bufferList = null; + Preconditions.checkNotNull(byteBufferList); + } else { + byteBufferList = null; + } flush(); if (close) { dataStreamCloseReply = out.closeAsync(); @@ -348,8 +398,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close, + flushPos + " blockID " + blockID); } // for standalone protocol, logIndex will always be 0. - commitWatcher.updateCommitInfoSet( - asyncReply.getLogIndex()); + commitWatcher + .updateCommitInfoSet(asyncReply.getLogIndex(), byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -421,6 +471,7 @@ public void close() throws IOException { Thread.currentThread().interrupt(); handleInterruptedException(ex, true); } finally { + cleanup(false); } @@ -471,6 +522,10 @@ public void cleanup(boolean invalidateClient) { if (xceiverClientFactory != null) { xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } + if (bufferList != null) { + bufferList.clear(); + } + bufferList = null; xceiverClientFactory = null; xceiverClient = null; commitWatcher.cleanup(); @@ -517,6 +572,10 @@ private boolean needSync(long position) { */ private void writeChunkToContainer(ByteBuffer buf) throws IOException { + if (bufferList == null) { + bufferList = new ArrayList<>(); + } + bufferList.add(buf); final int effectiveChunkSize = buf.remaining(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); ChecksumData checksumData = checksum.computeChecksum( @@ -589,4 +648,8 @@ private void handleExecutionException(Exception ex) throws IOException { setIoException(ex); throw getIoException(); } + + public long getTotalAckDataLength() { + return commitWatcher.getTotalAckDataLength(); + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index c187ffe902ba..80bab4e6b355 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -24,6 +24,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -31,13 +32,17 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Set; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * This class executes watchForCommit on ratis pipeline and releases @@ -48,7 +53,13 @@ public class StreamCommitWatcher { private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class); - private Set commitIndexSet; + private Map> commitIndexSet; + + private List bufferPool; + + // total data which has been successfully flushed and acknowledged + // by all servers + private long totalAckDataLength; // future Map to hold up all putBlock futures private ConcurrentHashMap bufferPool) { this.xceiverClient = xceiverClient; - commitIndexSet = new ConcurrentSkipListSet(); + commitIndexSet = new ConcurrentSkipListMap<>(); futureMap = new ConcurrentHashMap<>(); + this.bufferPool = bufferPool; + totalAckDataLength = 0; } - public void updateCommitInfoSet(long index) { - commitIndexSet.add(index); + public void updateCommitInfoSet(long index, List buffers) { + commitIndexSet.computeIfAbsent(index, k -> new LinkedList<>()) + .addAll(buffers); } int getCommitInfoSetSize() { @@ -83,7 +98,7 @@ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).min() + commitIndexSet.keySet().stream().mapToLong(v -> v).min() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for first index {} to catch up", index); @@ -107,7 +122,7 @@ public XceiverClientReply streamWatchOnLastIndex() // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.stream().mapToLong(v -> v).max() + commitIndexSet.keySet().stream().mapToLong(v -> v).max() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for last flush Index {} to catch up", index); @@ -127,9 +142,16 @@ public XceiverClientReply streamWatchOnLastIndex() */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { + long index; try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); + if (reply == null) { + index = 0; + } else { + index = reply.getLogIndex(); + } + adjustBuffers(index); return reply; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException @@ -140,11 +162,55 @@ public XceiverClientReply streamWatchForCommit(long commitIndex) } } + void releaseBuffersOnException() { + adjustBuffers(xceiverClient.getReplicatedMinCommitIndex()); + } + + private void adjustBuffers(long commitIndex) { + List keyList = commitIndexSet.keySet().stream() + .filter(p -> p <= commitIndex).collect(Collectors.toList()); + if (!keyList.isEmpty()) { + releaseBuffers(keyList); + } + } + + private long releaseBuffers(List indexes) { + Preconditions.checkArgument(!commitIndexSet.isEmpty()); + for (long index : indexes) { + Preconditions.checkState(commitIndexSet.containsKey(index)); + final List buffers + = commitIndexSet.remove(index); + long length = + buffers.stream().mapToLong(buf -> (buf.limit() - buf.position())) + .sum(); + totalAckDataLength += length; + // clear the future object from the future Map + final CompletableFuture remove = + futureMap.remove(totalAckDataLength); + if (remove == null) { + LOG.error("Couldn't find required future for " + totalAckDataLength); + for (Long key : futureMap.keySet()) { + LOG.error("Existing acknowledged data: " + key); + } + } + Preconditions.checkNotNull(remove); + for (ByteBuffer byteBuffer : buffers) { + bufferPool.remove(byteBuffer); + } + } + return totalAckDataLength; + } + + public long getTotalAckDataLength() { + return totalAckDataLength; + } + private IOException getIOExceptionForWatchForCommit(long commitIndex, Exception e) { LOG.warn("watchForCommit failed for index {}", commitIndex, e); IOException ioException = new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); + releaseBuffersOnException(); return ioException; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index f0c3a43e891e..b0056d4857a5 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.List; /** * Helper class used inside {@link BlockDataStreamOutput}. @@ -50,6 +51,7 @@ public final class BlockDataStreamOutputEntry // the current position of this stream 0 <= currentPosition < length private long currentPosition; private final Token token; + private List bufferPool; @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockDataStreamOutputEntry( @@ -58,7 +60,8 @@ private BlockDataStreamOutputEntry( Pipeline pipeline, long length, Token token, - OzoneClientConfig config + OzoneClientConfig config, + List bufferPool ) { this.config = config; this.byteBufferStreamOutput = null; @@ -69,6 +72,7 @@ private BlockDataStreamOutputEntry( this.token = token; this.length = length; this.currentPosition = 0; + this.bufferPool = bufferPool; } long getLength() { @@ -92,8 +96,8 @@ long getRemaining() { private void checkStream() throws IOException { if (this.byteBufferStreamOutput == null) { this.byteBufferStreamOutput = - new BlockDataStreamOutput(blockID, xceiverClientManager, - pipeline, config, token); + new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline, + config, token, bufferPool); } } @@ -151,6 +155,20 @@ long getWrittenDataLength() { } } + long getTotalAckDataLength() { + if (byteBufferStreamOutput != null) { + BlockDataStreamOutput out = + (BlockDataStreamOutput) this.byteBufferStreamOutput; + blockID = out.getBlockID(); + return out.getTotalAckDataLength(); + } else { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; + } + } + void cleanup(boolean invalidateClient) throws IOException { checkStream(); BlockDataStreamOutput out = @@ -180,6 +198,7 @@ public static class Builder { private long length; private Token token; private OzoneClientConfig config; + private List bufferPool; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -219,13 +238,18 @@ public Builder setToken(Token bToken) { return this; } + public Builder setBufferPool(List bPool) { + this.bufferPool = bPool; + return this; + } + public BlockDataStreamOutputEntry build() { return new BlockDataStreamOutputEntry(blockID, key, xceiverClientManager, pipeline, length, - token, config); + token, config, bufferPool); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 4bc55de262f1..bc1fcf353b37 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -59,6 +60,7 @@ public class BlockDataStreamOutputEntryPool { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; + private List bufferPool; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -83,6 +85,7 @@ public BlockDataStreamOutputEntryPool( this.requestID = requestId; this.openID = openID; this.excludeList = new ExcludeList(); + this.bufferPool = new ArrayList<>(); } /** @@ -142,7 +145,8 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setPipeline(subKeyInfo.getPipeline()) .setConfig(config) .setLength(subKeyInfo.getLength()) - .setToken(subKeyInfo.getToken()); + .setToken(subKeyInfo.getToken()) + .setBufferPool(bufferPool); streamEntries.add(builder.build()); } @@ -301,4 +305,12 @@ public ExcludeList getExcludeList() { boolean isEmpty() { return streamEntries.isEmpty(); } + + long computeBufferData() { + long totalDataLen =0; + for (ByteBuffer b:bufferPool){ + totalDataLen+=(b.limit()-b.position()); + } + return totalDataLen; + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 9bba89d0a8a0..2540e42e24ae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -278,11 +278,14 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, } Pipeline pipeline = streamEntry.getPipeline(); PipelineID pipelineId = pipeline.getId(); - + long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); + //set the correct length for the current stream + streamEntry.setCurrentPosition(totalSuccessfulFlushedData); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList(); + long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData(); if (!failedServers.isEmpty()) { excludeList.addDatanodes(failedServers); } @@ -316,6 +319,13 @@ private void handleException(BlockDataStreamOutputEntry streamEntry, blockDataStreamOutputEntryPool .discardPreallocatedBlocks(-1, pipelineId); } + if (bufferedDataLen > 0) { + // If the data is still cached in the underlying stream, we need to + // allocate new block and write this data in the datanode. + handleRetry(exception, bufferedDataLen); + // reset the retryCount after handling the exception + retryCount = 0; + } } private void markStreamClosed() { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index d3b2d22577e9..05a101951b80 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -21,15 +21,19 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; +import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -127,21 +131,25 @@ public static void shutdown() { @Test public void testHalfChunkWrite() throws Exception { testWrite(chunkSize / 2); + testWriteWithFailure(chunkSize/2); } @Test public void testSingleChunkWrite() throws Exception { testWrite(chunkSize); + testWriteWithFailure(chunkSize); } @Test public void testMultiChunkWrite() throws Exception { testWrite(chunkSize + 50); + testWriteWithFailure(chunkSize + 50); } @Test public void testMultiBlockWrite() throws Exception { testWrite(blockSize + 50); + testWriteWithFailure(blockSize + 50); } private void testWrite(int dataLength) throws Exception { @@ -156,6 +164,28 @@ private void testWrite(int dataLength) throws Exception { key.close(); validateData(keyName, data); } + + private void testWriteWithFailure(int dataLength) throws Exception { + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + ByteBuffer b = ByteBuffer.wrap(data); + key.write(b); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + ByteBufferStreamOutput stream = + keyDataStreamOutput.getStreamEntries().get(0).getByteBufStreamOutput(); + Assert.assertTrue(stream instanceof BlockDataStreamOutput); + TestHelper.waitForContainerClose(key, cluster); + key.write(b); + key.close(); + String dataString = new String(data, UTF_8); + validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + } + private OzoneDataStreamOutput createKey(String keyName, ReplicationType type, long size) throws Exception { return TestHelper.createStreamKey( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index 0e48dd9d2d67..82fff089cea3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -40,7 +40,9 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry; import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -189,6 +191,24 @@ public static void waitForContainerClose(OzoneOutputStream outputStream, waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); } + + public static void waitForContainerClose(OzoneDataStreamOutput outputStream, + MiniOzoneCluster cluster) throws Exception { + KeyDataStreamOutput keyOutputStream = + (KeyDataStreamOutput) outputStream.getByteBufStreamOutput(); + List streamEntryList = + keyOutputStream.getStreamEntries(); + List containerIdList = new ArrayList<>(); + for (BlockDataStreamOutputEntry entry : streamEntryList) { + long id = entry.getBlockID().getContainerID(); + if (!containerIdList.contains(id)) { + containerIdList.add(id); + } + } + Assert.assertTrue(!containerIdList.isEmpty()); + waitForContainerClose(cluster, containerIdList.toArray(new Long[0])); + } + public static void waitForPipelineClose(OzoneOutputStream outputStream, MiniOzoneCluster cluster, boolean waitForContainerCreation) throws Exception { From 0f53f8e87ab4b3b52425686b2a73f6e4072840dc Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 11 Oct 2021 19:18:10 +0530 Subject: [PATCH 2/8] Use only one list (cherry picked from commit cb50698b13fb2c53a10a9d44b607d19e3806a9d0) --- .../scm/storage/BlockDataStreamOutput.java | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index bb460e2e2738..690ad81d3219 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -95,14 +95,6 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Similar to 'BufferPool' but this list maintains only references // to the ByteBuffers. - private List bufferPool; - - // List containing buffers for which the putBlock call will - // update the length in the datanodes. This list will just maintain - // references to the buffers in the BufferPool which will be cleared - // when the watchForCommit acknowledges a putBlock logIndex has been - // committed on all datanodes. This list will be a place holder for buffers - // which got written between successive putBlock calls. private List bufferList; // The IOException will be set by response handling thread in case there is an @@ -147,7 +139,7 @@ public BlockDataStreamOutput( Pipeline pipeline, OzoneClientConfig config, Token token, - List bufferPool + List bufferList ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -162,7 +154,7 @@ public BlockDataStreamOutput( // Alternatively, stream setup can be delayed till the first chunk write. this.out = setupStream(pipeline); this.token = token; - this.bufferPool = bufferPool; + this.bufferList = bufferList; flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -173,7 +165,7 @@ public BlockDataStreamOutput( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new StreamCommitWatcher(xceiverClient, bufferPool); + commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList); totalDataFlushedLength = 0; writtenDataLength = 0; failedServers = new ArrayList<>(0); @@ -257,9 +249,6 @@ public IOException getIoException() { } - public List getBufferPool() { - return bufferPool; - } @Override public void write(ByteBuffer b, int off, int len) throws IOException { @@ -272,9 +261,9 @@ public void write(ByteBuffer b, int off, int len) throws IOException { } ByteBuffer buf = (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len); - bufferPool.add(buf); + bufferList.add(buf); - writeChunkToContainer(buf); + writeChunkToContainer(buf.duplicate()); writtenDataLength += len; } @@ -303,9 +292,9 @@ public void writeOnRetry(long len) throws IOException { } int count = 0; while (len > 0) { - ByteBuffer buf = bufferPool.get(count); + ByteBuffer buf = bufferList.get(count); long writeLen = Math.min(buf.limit() - buf.position(), len); - writeChunkToContainer(buf); + writeChunkToContainer(buf.duplicate()); len -= writeLen; count++; writtenDataLength += writeLen; @@ -359,7 +348,6 @@ ContainerCommandResponseProto> executePutBlock(boolean close, if (!force) { Preconditions.checkNotNull(bufferList); byteBufferList = bufferList; - bufferList = null; Preconditions.checkNotNull(byteBufferList); } else { byteBufferList = null; @@ -522,10 +510,6 @@ public void cleanup(boolean invalidateClient) { if (xceiverClientFactory != null) { xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } - if (bufferList != null) { - bufferList.clear(); - } - bufferList = null; xceiverClientFactory = null; xceiverClient = null; commitWatcher.cleanup(); @@ -572,10 +556,6 @@ private boolean needSync(long position) { */ private void writeChunkToContainer(ByteBuffer buf) throws IOException { - if (bufferList == null) { - bufferList = new ArrayList<>(); - } - bufferList.add(buf); final int effectiveChunkSize = buf.remaining(); final long offset = chunkOffset.getAndAdd(effectiveChunkSize); ChecksumData checksumData = checksum.computeChecksum( From 0d0440f0bce6eccbf41c2c8500747c8908fb216d Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 11 Oct 2021 19:22:46 +0530 Subject: [PATCH 3/8] Use only one list (cherry picked from commit 50cacaee7e829f693648c6701e33d7f912a7eced) --- .../ozone/client/io/BlockDataStreamOutputEntryPool.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index bc1fcf353b37..403869d25f1c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -60,7 +60,7 @@ public class BlockDataStreamOutputEntryPool { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; - private List bufferPool; + private List bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -85,7 +85,7 @@ public BlockDataStreamOutputEntryPool( this.requestID = requestId; this.openID = openID; this.excludeList = new ExcludeList(); - this.bufferPool = new ArrayList<>(); + this.bufferList = new ArrayList<>(); } /** @@ -146,7 +146,7 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setConfig(config) .setLength(subKeyInfo.getLength()) .setToken(subKeyInfo.getToken()) - .setBufferPool(bufferPool); + .setBufferPool(bufferList); streamEntries.add(builder.build()); } @@ -308,7 +308,7 @@ boolean isEmpty() { long computeBufferData() { long totalDataLen =0; - for (ByteBuffer b:bufferPool){ + for (ByteBuffer b: bufferList){ totalDataLen+=(b.limit()-b.position()); } return totalDataLen; From 8daf208bbd15e0706d880819ce7473424d99ef8d Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 11 Oct 2021 19:46:10 +0530 Subject: [PATCH 4/8] Rename variable (cherry picked from commit 22fe01e98d7cfceefce36d27f4b5941478c7a3b3) --- .../hdds/scm/storage/StreamCommitWatcher.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 80bab4e6b355..8bd3cad8b014 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -53,7 +53,7 @@ public class StreamCommitWatcher { private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class); - private Map> commitIndexSet; + private Map> commitIndexMap; private List bufferPool; @@ -71,19 +71,19 @@ public class StreamCommitWatcher { public StreamCommitWatcher(XceiverClientSpi xceiverClient, List bufferPool) { this.xceiverClient = xceiverClient; - commitIndexSet = new ConcurrentSkipListMap<>(); + commitIndexMap = new ConcurrentSkipListMap<>(); futureMap = new ConcurrentHashMap<>(); this.bufferPool = bufferPool; totalAckDataLength = 0; } public void updateCommitInfoSet(long index, List buffers) { - commitIndexSet.computeIfAbsent(index, k -> new LinkedList<>()) + commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } int getCommitInfoSetSize() { - return commitIndexSet.size(); + return commitIndexMap.size(); } /** @@ -93,12 +93,12 @@ int getCommitInfoSetSize() { * @throws IOException in case watchForCommit fails */ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the first commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.keySet().stream().mapToLong(v -> v).min() + commitIndexMap.keySet().stream().mapToLong(v -> v).min() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for first index {} to catch up", index); @@ -117,12 +117,12 @@ public XceiverClientReply streamWatchOnFirstIndex() throws IOException { */ public XceiverClientReply streamWatchOnLastIndex() throws IOException { - if (!commitIndexSet.isEmpty()) { + if (!commitIndexMap.isEmpty()) { // wait for the commit index in the commitIndex2flushedDataMap // to get committed to all or majority of nodes in case timeout // happens. long index = - commitIndexSet.keySet().stream().mapToLong(v -> v).max() + commitIndexMap.keySet().stream().mapToLong(v -> v).max() .getAsLong(); if (LOG.isDebugEnabled()) { LOG.debug("waiting for last flush Index {} to catch up", index); @@ -167,7 +167,7 @@ void releaseBuffersOnException() { } private void adjustBuffers(long commitIndex) { - List keyList = commitIndexSet.keySet().stream() + List keyList = commitIndexMap.keySet().stream() .filter(p -> p <= commitIndex).collect(Collectors.toList()); if (!keyList.isEmpty()) { releaseBuffers(keyList); @@ -175,11 +175,11 @@ private void adjustBuffers(long commitIndex) { } private long releaseBuffers(List indexes) { - Preconditions.checkArgument(!commitIndexSet.isEmpty()); + Preconditions.checkArgument(!commitIndexMap.isEmpty()); for (long index : indexes) { - Preconditions.checkState(commitIndexSet.containsKey(index)); + Preconditions.checkState(commitIndexMap.containsKey(index)); final List buffers - = commitIndexSet.remove(index); + = commitIndexMap.remove(index); long length = buffers.stream().mapToLong(buf -> (buf.limit() - buf.position())) .sum(); @@ -221,12 +221,12 @@ ContainerCommandResponseProto>> getFutureMap() { } public void cleanup() { - if (commitIndexSet != null) { - commitIndexSet.clear(); + if (commitIndexMap != null) { + commitIndexMap.clear(); } if (futureMap != null) { futureMap.clear(); } - commitIndexSet = null; + commitIndexMap = null; } } From 10974f2c1e856fac44ad7330c6403a78d4c703de Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 13 Oct 2021 15:43:52 +0530 Subject: [PATCH 5/8] Use StreamBuffer --- .../scm/storage/BlockDataStreamOutput.java | 20 ++++----- .../hadoop/hdds/scm/storage/StreamBuffer.java | 45 +++++++++++++++++++ .../hdds/scm/storage/StreamCommitWatcher.java | 29 +++++------- .../client/io/BlockDataStreamOutputEntry.java | 17 +++---- .../io/BlockDataStreamOutputEntryPool.java | 10 ++--- 5 files changed, 81 insertions(+), 40 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 690ad81d3219..44bcc6576141 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -95,7 +95,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Similar to 'BufferPool' but this list maintains only references // to the ByteBuffers. - private List bufferList; + private List bufferList; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next @@ -139,7 +139,7 @@ public BlockDataStreamOutput( Pipeline pipeline, OzoneClientConfig config, Token token, - List bufferList + List bufferList ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -259,9 +259,9 @@ public void write(ByteBuffer b, int off, int len) throws IOException { if (len == 0) { return; } - ByteBuffer buf = - (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len); - bufferList.add(buf); + + final StreamBuffer buf = new StreamBuffer(b, off, len); + bufferList.add(new StreamBuffer(b, off, len)); writeChunkToContainer(buf.duplicate()); @@ -292,8 +292,8 @@ public void writeOnRetry(long len) throws IOException { } int count = 0; while (len > 0) { - ByteBuffer buf = bufferList.get(count); - long writeLen = Math.min(buf.limit() - buf.position(), len); + StreamBuffer buf = bufferList.get(count); + long writeLen = Math.min(buf.length(), len); writeChunkToContainer(buf.duplicate()); len -= writeLen; count++; @@ -344,7 +344,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; - final List byteBufferList; + final List byteBufferList; if (!force) { Preconditions.checkNotNull(bufferList); byteBufferList = bufferList; @@ -382,12 +382,12 @@ ContainerCommandResponseProto> executePutBlock(boolean close, if (LOG.isDebugEnabled()) { LOG.debug( "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoSetSize() + " flushLength " + + commitWatcher.getCommitInfoMapSize() + " flushLength " + flushPos + " blockID " + blockID); } // for standalone protocol, logIndex will always be 0. commitWatcher - .updateCommitInfoSet(asyncReply.getLogIndex(), byteBufferList); + .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java new file mode 100644 index 000000000000..0ef9c8891c97 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.nio.ByteBuffer; + +/** + * Used for streaming write. + */ +public class StreamBuffer { + private final ByteBuffer buffer; + + public StreamBuffer(ByteBuffer buffer) { + this.buffer = buffer.asReadOnlyBuffer(); + } + + public StreamBuffer(ByteBuffer buffer, int offset, int length) { + this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset) + .limit(offset + length)); + } + + public ByteBuffer duplicate() { + return buffer.duplicate(); + } + + public int length() { + return buffer.limit() - buffer.position(); + } +} \ No newline at end of file diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 8bd3cad8b014..3a59d0757105 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -53,9 +52,8 @@ public class StreamCommitWatcher { private static final Logger LOG = LoggerFactory.getLogger(StreamCommitWatcher.class); - private Map> commitIndexMap; - - private List bufferPool; + private Map> commitIndexMap; + private List bufferList; // total data which has been successfully flushed and acknowledged // by all servers @@ -69,20 +67,20 @@ public class StreamCommitWatcher { private XceiverClientSpi xceiverClient; public StreamCommitWatcher(XceiverClientSpi xceiverClient, - List bufferPool) { + List bufferList) { this.xceiverClient = xceiverClient; commitIndexMap = new ConcurrentSkipListMap<>(); futureMap = new ConcurrentHashMap<>(); - this.bufferPool = bufferPool; + this.bufferList = bufferList; totalAckDataLength = 0; } - public void updateCommitInfoSet(long index, List buffers) { + public void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } - int getCommitInfoSetSize() { + int getCommitInfoMapSize() { return commitIndexMap.size(); } @@ -142,7 +140,7 @@ public XceiverClientReply streamWatchOnLastIndex() */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { - long index; + final long index; try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); @@ -178,11 +176,9 @@ private long releaseBuffers(List indexes) { Preconditions.checkArgument(!commitIndexMap.isEmpty()); for (long index : indexes) { Preconditions.checkState(commitIndexMap.containsKey(index)); - final List buffers - = commitIndexMap.remove(index); - long length = - buffers.stream().mapToLong(buf -> (buf.limit() - buf.position())) - .sum(); + final List buffers = commitIndexMap.remove(index); + final long length = + buffers.stream().mapToLong(StreamBuffer::length).sum(); totalAckDataLength += length; // clear the future object from the future Map final CompletableFuture remove = @@ -193,9 +189,8 @@ private long releaseBuffers(List indexes) { LOG.error("Existing acknowledged data: " + key); } } - Preconditions.checkNotNull(remove); - for (ByteBuffer byteBuffer : buffers) { - bufferPool.remove(byteBuffer); + for (StreamBuffer byteBuffer : buffers) { + bufferList.remove(byteBuffer); } } return totalAckDataLength; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index b0056d4857a5..2cd5630549c3 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -51,7 +52,7 @@ public final class BlockDataStreamOutputEntry // the current position of this stream 0 <= currentPosition < length private long currentPosition; private final Token token; - private List bufferPool; + private List bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) private BlockDataStreamOutputEntry( @@ -61,7 +62,7 @@ private BlockDataStreamOutputEntry( long length, Token token, OzoneClientConfig config, - List bufferPool + List bufferList ) { this.config = config; this.byteBufferStreamOutput = null; @@ -72,7 +73,7 @@ private BlockDataStreamOutputEntry( this.token = token; this.length = length; this.currentPosition = 0; - this.bufferPool = bufferPool; + this.bufferList = bufferList; } long getLength() { @@ -97,7 +98,7 @@ private void checkStream() throws IOException { if (this.byteBufferStreamOutput == null) { this.byteBufferStreamOutput = new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline, - config, token, bufferPool); + config, token, bufferList); } } @@ -198,7 +199,7 @@ public static class Builder { private long length; private Token token; private OzoneClientConfig config; - private List bufferPool; + private List bufferList; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -238,8 +239,8 @@ public Builder setToken(Token bToken) { return this; } - public Builder setBufferPool(List bPool) { - this.bufferPool = bPool; + public Builder setBufferList(List bList) { + this.bufferList = bList; return this; } @@ -249,7 +250,7 @@ public BlockDataStreamOutputEntry build() { xceiverClientManager, pipeline, length, - token, config, bufferPool); + token, config, bufferList); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 403869d25f1c..e49b0b79adf6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.StreamBuffer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -60,7 +60,7 @@ public class BlockDataStreamOutputEntryPool { private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; private final ExcludeList excludeList; - private List bufferList; + private List bufferList; @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockDataStreamOutputEntryPool( @@ -146,7 +146,7 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setConfig(config) .setLength(subKeyInfo.getLength()) .setToken(subKeyInfo.getToken()) - .setBufferPool(bufferList); + .setBufferList(bufferList); streamEntries.add(builder.build()); } @@ -308,8 +308,8 @@ boolean isEmpty() { long computeBufferData() { long totalDataLen =0; - for (ByteBuffer b: bufferList){ - totalDataLen+=(b.limit()-b.position()); + for (StreamBuffer b : bufferList){ + totalDataLen += b.length(); } return totalDataLen; } From 62e14a6abc365bef8df34e1284cb7af0edd596bb Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 20 Oct 2021 10:18:52 +0530 Subject: [PATCH 6/8] remove empty line --- .../apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 44bcc6576141..40ed78dfa8f3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -248,8 +248,6 @@ public IOException getIoException() { return ioException.get(); } - - @Override public void write(ByteBuffer b, int off, int len) throws IOException { checkOpen(); @@ -459,7 +457,6 @@ public void close() throws IOException { Thread.currentThread().interrupt(); handleInterruptedException(ex, true); } finally { - cleanup(false); } From 82568f051b3c2ca080f4b5567058fb11fcde5d45 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 20 Oct 2021 13:36:45 +0530 Subject: [PATCH 7/8] address comment --- .../apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 3 +++ .../java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 40ed78dfa8f3..4be92167823b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -292,6 +292,9 @@ public void writeOnRetry(long len) throws IOException { while (len > 0) { StreamBuffer buf = bufferList.get(count); long writeLen = Math.min(buf.length(), len); + if (writeLen == len){ + buf.setLimit((int) len); + } writeChunkToContainer(buf.duplicate()); len -= writeLen; count++; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index 0ef9c8891c97..40b0308fdf66 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -42,4 +42,8 @@ public ByteBuffer duplicate() { public int length() { return buffer.limit() - buffer.position(); } + + public void setLimit(int limit) { + buffer.limit(limit); + } } \ No newline at end of file From 52f97c6af065b87f5f7f1292fc5451e387163062 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Thu, 21 Oct 2021 10:10:47 +0530 Subject: [PATCH 8/8] address comments --- .../hdds/scm/storage/BlockDataStreamOutput.java | 13 +++++++------ .../hadoop/hdds/scm/storage/StreamBuffer.java | 3 --- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 4be92167823b..aada48e2f596 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -259,7 +259,7 @@ public void write(ByteBuffer b, int off, int len) throws IOException { } final StreamBuffer buf = new StreamBuffer(b, off, len); - bufferList.add(new StreamBuffer(b, off, len)); + bufferList.add(buf); writeChunkToContainer(buf.duplicate()); @@ -290,12 +290,13 @@ public void writeOnRetry(long len) throws IOException { } int count = 0; while (len > 0) { - StreamBuffer buf = bufferList.get(count); - long writeLen = Math.min(buf.length(), len); - if (writeLen == len){ - buf.setLimit((int) len); + final StreamBuffer buf = bufferList.get(count); + final long writeLen = Math.min(buf.length(), len); + final ByteBuffer duplicated = buf.duplicate(); + if (writeLen != buf.length()) { + duplicated.limit(Math.toIntExact(len)); } - writeChunkToContainer(buf.duplicate()); + writeChunkToContainer(duplicated); len -= writeLen; count++; writtenDataLength += writeLen; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java index 40b0308fdf66..f36019e2aeb8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java @@ -43,7 +43,4 @@ public int length() { return buffer.limit() - buffer.position(); } - public void setLimit(int limit) { - buffer.limit(limit); - } } \ No newline at end of file