From b1f2c9a664756dcd549b5f13b4efc803a1ced962 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Sun, 12 Jun 2022 22:23:04 -0700 Subject: [PATCH 1/8] HDDS-6794: EC: Analyze and add putBlock even on non writing node in the case of partial single stripe. --- .../hdds/scm/storage/BlockOutputStream.java | 6 +++++- .../hdds/scm/storage/ECBlockOutputStream.java | 1 + .../client/io/ECBlockOutputStreamEntry.java | 19 +++++++++++-------- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b86464c0976a..ade96dd8c0c9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -268,11 +268,15 @@ public void write(byte[] b, int off, int len) throws IOException { writeChunkIfNeeded(); off += writeLen; len -= writeLen; - writtenDataLength += writeLen; + updateWrittenDataLength(writeLen); doFlushOrWatchIfNeeded(); } } + public void updateWrittenDataLength(int writeLen) { + writtenDataLength += writeLen; + } + private void doFlushOrWatchIfNeeded() throws IOException { if (currentBufferRemaining == 0) { if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 174e507829b7..9f6bf059506c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -78,6 +78,7 @@ public ECBlockOutputStream( public void write(byte[] b, int off, int len) throws IOException { this.currentChunkRspFuture = writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len))); + updateWrittenDataLength(len); } public CompletableFuture write( diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 99cebf8217ad..91eb154560df 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -100,14 +100,12 @@ void checkStream() throws IOException { void createOutputStream() throws IOException { Pipeline ecPipeline = getPipeline(); List nodes = getPipeline().getNodes(); - blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream( - getBlockID(), - getXceiverClientManager(), - createSingleECBlockPipeline( - ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1), - getBufferPool(), - getConf(), - getToken()); + for (int i = currentStreamIdx; i < nodes.size(); i++) { + blockOutputStreams[i] = + new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), + createSingleECBlockPipeline(ecPipeline, nodes.get(i), i + 1), + getBufferPool(), getConf(), getToken()); + } } @Override @@ -318,6 +316,11 @@ private List getFailedStreams(boolean forPutBlock) { List failedStreams = new ArrayList<>(); while (iter.hasNext()) { final ECBlockOutputStream stream = iter.next(); + if (stream.getWrittenDataLength() <= 0) { + // If we did not write any data to this stream yet, let's not consider + // for failure checking. + continue; + } CompletableFuture responseFuture = null; if (forPutBlock) { From cd8032287abc57c81bb9034ab6cc5636a52ac1ad Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Mon, 13 Jun 2022 09:35:57 -0700 Subject: [PATCH 2/8] Fixed the container scanner check --- .../ozone/container/keyvalue/KeyValueContainerCheck.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index c560aabbe630..d7aaf78f6c97 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -250,9 +250,12 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler) BlockData bdata = db.getStore() .getBlockDataTable() .get(blockKey); + // In EC, client may write empty putBlock in padding block nodes. + // So, we need to make sure, chunk length > 0, before declaring + // the missing chunk file. if (bdata != null) { - throw new IOException("Missing chunk file " - + chunkFile.getAbsolutePath()); + throw new IOException( + "Missing chunk file " + chunkFile.getAbsolutePath()); } } else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) { From 069554132b1bbb676de533b10ee94cde5ba09d68 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Wed, 15 Jun 2022 00:50:58 -0700 Subject: [PATCH 3/8] Container scanner check added to handle empty putBlock containers. --- .../ozone/container/keyvalue/KeyValueContainerCheck.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index d7aaf78f6c97..90c392bc01a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -253,7 +253,8 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler) // In EC, client may write empty putBlock in padding block nodes. // So, we need to make sure, chunk length > 0, before declaring // the missing chunk file. - if (bdata != null) { + if (bdata != null && bdata.getChunks().size() > 0 && bdata + .getChunks().get(0).getLen() > 0) { throw new IOException( "Missing chunk file " + chunkFile.getAbsolutePath()); } From 4e4bac09a95f2c4c564635bc9537af7032b7e65e Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Mon, 20 Jun 2022 00:25:39 -0700 Subject: [PATCH 4/8] createCOntainer while creating ECBlockOutputStream --- .../scm/storage/ContainerProtocolCalls.java | 2 +- .../container/keyvalue/KeyValueHandler.java | 11 +++++-- .../client/io/ECBlockOutputStreamEntry.java | 30 +++++++++++++++++-- .../ozone/client/MockXceiverClientSpi.java | 4 +++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 16bef6971215..63ccbd776f53 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -456,7 +456,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, * @param replicaIndex - index position of the container replica * @throws IOException */ - private static void createContainerInternal(XceiverClientSpi client, + public static void createContainerInternal(XceiverClientSpi client, long containerID, String encodedToken, ContainerProtos.ContainerDataProto.State state, int replicaIndex) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 95907116506a..f1606cccb2a3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -84,6 +84,7 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER; @@ -94,12 +95,14 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getContainerCommandResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; @@ -258,7 +261,7 @@ public BlockManager getBlockManager() { * ContainerSet and sends an ICR to the SCM. */ ContainerCommandResponseProto handleCreateContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer){ if (!request.hasCreateContainer()) { if (LOG.isDebugEnabled()) { LOG.debug("Malformed Create Container request. trace ID: {}", @@ -268,7 +271,11 @@ ContainerCommandResponseProto handleCreateContainer( } // Create Container request should be passed a null container as the // container would be created here. - Preconditions.checkArgument(kvContainer == null); + if (kvContainer != null) { + return getSuccessResponseBuilder(request) + .setMessage("Container already exist.").build(); + } + //Preconditions.checkArgument(kvContainer == null); long containerID = request.getContainerID(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 91eb154560df..07549ba9cc2d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -90,6 +91,23 @@ void checkStream() throws IOException { if (!isInitialized()) { blockOutputStreams = new ECBlockOutputStream[replicationConfig.getRequiredNodes()]; + for (int i = 0; i < replicationConfig.getRequiredNodes(); i++) { + if (blockOutputStreams[i] != null) { + continue; + } + blockOutputStreams[i] = + new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), + createSingleECBlockPipeline(getPipeline(), + getPipeline().getNodes().get(i), i + 1), getBufferPool(), + getConf(), getToken()); + System.out.println( + "Creating container from client: " + getBlockID().getContainerID()); + ContainerProtocolCalls + .createContainerInternal(blockOutputStreams[i].getXceiverClient(), + getBlockID().getContainerID(), + getToken() != null ? getToken().encodeToUrlString() : null, + null, i + 1); + } } if (blockOutputStreams[currentStreamIdx] == null) { createOutputStream(); @@ -101,10 +119,18 @@ void createOutputStream() throws IOException { Pipeline ecPipeline = getPipeline(); List nodes = getPipeline().getNodes(); for (int i = currentStreamIdx; i < nodes.size(); i++) { - blockOutputStreams[i] = + if (blockOutputStreams[i] != null) { + continue; + } + /*blockOutputStreams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(ecPipeline, nodes.get(i), i + 1), getBufferPool(), getConf(), getToken()); + ContainerProtocolCalls + .createContainerInternal(blockOutputStreams[i].getXceiverClient(), + getBlockID().getContainerID(), + getToken() != null ? getToken().encodeToUrlString() : null, null, + i + 1);*/ } } @@ -272,7 +298,7 @@ void executePutBlock(boolean isClose, long blockGroupLength) { return; } for (ECBlockOutputStream stream : blockOutputStreams) { - if (stream == null) { + if (stream == null || stream.getWrittenDataLength() <= 0) { continue; } try { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java index 49eee44f6e33..d94181abb428 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java @@ -94,6 +94,10 @@ public XceiverClientReply sendCommandAsync( return r.setResult(Result.IO_EXCEPTION); } }); + case CreateContainer: + return result(request, + r -> r.setCreateContainer( + ContainerProtos.CreateContainerResponseProto.newBuilder().build())); case ReadChunk: return result(request, r -> r.setReadChunk(readChunk(request.getReadChunk()))); From da45b921d1dfd2a03376c07b767167a3a656942a Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 30 Jun 2022 21:20:32 -0700 Subject: [PATCH 5/8] Added consitions at DN to make sure container create on putblock if it does not exist. --- .../hdds/scm/storage/BlockOutputStream.java | 13 +++--- .../hdds/scm/storage/ECBlockOutputStream.java | 2 + .../scm/storage/ContainerProtocolCalls.java | 6 +-- .../container/common/impl/HddsDispatcher.java | 19 +++++++- .../container/keyvalue/KeyValueHandler.java | 14 +++--- .../client/io/ECBlockOutputStreamEntry.java | 45 +++---------------- .../ozone/client/TestOzoneECClient.java | 3 ++ .../client/rpc/TestECKeyOutputStream.java | 7 +-- 8 files changed, 51 insertions(+), 58 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index ade96dd8c0c9..635fab978c7c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -140,17 +140,19 @@ public BlockOutputStream( this.xceiverClientFactory = xceiverClientManager; this.config = config; this.blockID = new AtomicReference<>(blockID); + replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = - BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); + this.containerBlockData = BlockData.newBuilder().setBlockID( + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(blockID.getContainerID()) + .setLocalID(blockID.getLocalID()) + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) + .setReplicaIndex(replicationIndex).build()).addMetadata(keyValue); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; - replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); - //number of buffers used before doing a flush refreshCurrentBuffer(); flushPeriod = (int) (config.getStreamBufferFlushSize() / config @@ -615,6 +617,7 @@ public void setIoException(Exception e) { if (ioe == null) { IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); ioException.compareAndSet(null, exception); + LOG.debug("Exception: for block ID: " + blockID, e); } else { LOG.debug("Previous request had already failed with {} " + "so subsequent request also encounters " + diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 9f6bf059506c..d0afa43d7ef2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -52,6 +52,7 @@ public class ECBlockOutputStream extends BlockOutputStream { private CompletableFuture putBlkRspFuture = null; + private int replicaIndex = 0; /** * Creates a new ECBlockOutputStream. * @@ -72,6 +73,7 @@ public ECBlockOutputStream( pipeline, bufferPool, config, token); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); + this.replicaIndex = pipeline.getReplicaIndex(this.datanodeDetails); } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 63ccbd776f53..e024d79b9a7d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -229,8 +229,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @throws InterruptedException * @throws ExecutionException */ - public static XceiverClientReply putBlockAsync( - XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, + public static XceiverClientReply putBlockAsync(XceiverClientSpi xceiverClient, + BlockData containerBlockData, boolean eof, Token token) throws IOException, InterruptedException, ExecutionException { PutBlockRequestProto.Builder createBlockRequest = @@ -456,7 +456,7 @@ public static void createContainer(XceiverClientSpi client, long containerID, * @param replicaIndex - index position of the container replica * @throws IOException */ - public static void createContainerInternal(XceiverClientSpi client, + private static void createContainerInternal(XceiverClientSpi client, long containerID, String encodedToken, ContainerProtos.ContainerDataProto.State state, int replicaIndex) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 802104a17140..8f2af71ead61 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -247,12 +247,20 @@ private ContainerCommandResponseProto dispatchRequest( } if (cmdType != Type.CreateContainer) { + int replicaIndexFromPutBlock = msg.hasPutBlock() ? + msg.getPutBlock().getBlockData().getBlockID().getReplicaIndex() : + 0; /** * Create Container should happen only as part of Write_Data phase of * writeChunk. + * In EC, we are doing empty putBlock. In the partial stripe writes, if + * file size is less than chunkSize*(ECData-1), we are making empty block + * to get the container created in non writing nodes. If replica index is + * >0 then we know it's for ec container. */ if (container == null && ((isWriteStage || isCombinedStage) - || cmdType == Type.PutSmallFile)) { + || cmdType == Type.PutSmallFile + || (cmdType == Type.PutBlock && replicaIndexFromPutBlock > 0))) { // If container does not exist, create one for WriteChunk and // PutSmallFile request responseProto = createContainer(msg); @@ -264,7 +272,8 @@ private ContainerCommandResponseProto dispatchRequest( return ContainerUtils.logAndReturnError(LOG, sce, msg); } Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null - || dispatcherContext == null); + || dispatcherContext == null || + (cmdType == Type.PutBlock && replicaIndexFromPutBlock > 0)); if (container2BCSIDMap != null) { // adds this container to list of containers created in the pipeline // with initial BCSID recorded as 0. @@ -416,6 +425,12 @@ ContainerCommandResponseProto createContainer( containerRequest.getWriteChunk().getBlockID().getReplicaIndex()); } + if (containerRequest.hasPutBlock()) { + createRequest.setReplicaIndex( + containerRequest.getPutBlock().getBlockData().getBlockID() + .getReplicaIndex()); + } + ContainerCommandRequestProto.Builder requestBuilder = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.CreateContainer) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index f1606cccb2a3..aeb32a9dbb31 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -261,7 +261,7 @@ public BlockManager getBlockManager() { * ContainerSet and sends an ICR to the SCM. */ ContainerCommandResponseProto handleCreateContainer( - ContainerCommandRequestProto request, KeyValueContainer kvContainer){ + ContainerCommandRequestProto request, KeyValueContainer kvContainer) { if (!request.hasCreateContainer()) { if (LOG.isDebugEnabled()) { LOG.debug("Malformed Create Container request. trace ID: {}", @@ -271,11 +271,7 @@ ContainerCommandResponseProto handleCreateContainer( } // Create Container request should be passed a null container as the // container would be created here. - if (kvContainer != null) { - return getSuccessResponseBuilder(request) - .setMessage("Container already exist.").build(); - } - //Preconditions.checkArgument(kvContainer == null); + Preconditions.checkArgument(kvContainer == null); long containerID = request.getContainerID(); @@ -472,7 +468,11 @@ ContainerCommandResponseProto handlePutBlock( boolean endOfBlock = false; if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) { - chunkManager.finishWriteChunks(kvContainer, blockData); + // in EC, we will be doing empty put block. So, there may not be dat + // a available. So, let's flush only when data size is > 0. + if (request.getPutBlock().getBlockData().getSize() > 0) { + chunkManager.finishWriteChunks(kvContainer, blockData); + } endOfBlock = true; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 07549ba9cc2d..89f09802da23 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -91,47 +91,16 @@ void checkStream() throws IOException { if (!isInitialized()) { blockOutputStreams = new ECBlockOutputStream[replicationConfig.getRequiredNodes()]; - for (int i = 0; i < replicationConfig.getRequiredNodes(); i++) { - if (blockOutputStreams[i] != null) { - continue; - } + for (int i = currentStreamIdx; i < replicationConfig + .getRequiredNodes(); i++) { + Pipeline ecPipeline = getPipeline(); + List nodes = getPipeline().getNodes(); blockOutputStreams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), - createSingleECBlockPipeline(getPipeline(), - getPipeline().getNodes().get(i), i + 1), getBufferPool(), - getConf(), getToken()); - System.out.println( - "Creating container from client: " + getBlockID().getContainerID()); - ContainerProtocolCalls - .createContainerInternal(blockOutputStreams[i].getXceiverClient(), - getBlockID().getContainerID(), - getToken() != null ? getToken().encodeToUrlString() : null, - null, i + 1); + createSingleECBlockPipeline(ecPipeline, nodes.get(i), i + 1), + getBufferPool(), getConf(), getToken()); } } - if (blockOutputStreams[currentStreamIdx] == null) { - createOutputStream(); - } - } - - @Override - void createOutputStream() throws IOException { - Pipeline ecPipeline = getPipeline(); - List nodes = getPipeline().getNodes(); - for (int i = currentStreamIdx; i < nodes.size(); i++) { - if (blockOutputStreams[i] != null) { - continue; - } - /*blockOutputStreams[i] = - new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), - createSingleECBlockPipeline(ecPipeline, nodes.get(i), i + 1), - getBufferPool(), getConf(), getToken()); - ContainerProtocolCalls - .createContainerInternal(blockOutputStreams[i].getXceiverClient(), - getBlockID().getContainerID(), - getToken() != null ? getToken().encodeToUrlString() : null, null, - i + 1);*/ - } } @Override @@ -298,7 +267,7 @@ void executePutBlock(boolean isClose, long blockGroupLength) { return; } for (ECBlockOutputStream stream : blockOutputStreams) { - if (stream == null || stream.getWrittenDataLength() <= 0) { + if (stream == null) { continue; } try { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java index 3438e3f2f98b..98f014a7b14c 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java @@ -560,6 +560,9 @@ public void test10D4PConfigWithPartialStripe() OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); int dataBlks = 10; int parityBlks = 4; + MultiNodePipelineBlockAllocator blkAllocator = + new MultiNodePipelineBlockAllocator(conf, dataBlks + parityBlks, 14); + createNewClient(conf, blkAllocator); store.createVolume(volumeName); OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java index 948c48143540..4f0a09177173 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java @@ -292,7 +292,7 @@ public void testMultipleChunksInSingleWriteOp(int numChunks) } @Test - public void testECContainerKeysCount() + public void testECContainerKeysCountAndNumContainerReplicas() throws IOException, InterruptedException, TimeoutException { byte[] inputData = getInputBytes(1); final OzoneBucket bucket = getOzoneBucket(); @@ -320,8 +320,9 @@ public void testECContainerKeysCount() GenericTestUtils.waitFor(() -> { try { - return containerOperationClient.getContainer(currentKeyContainerID) - .getNumberOfKeys() == 1; + return (containerOperationClient.getContainer(currentKeyContainerID) + .getNumberOfKeys() == 1) && (containerOperationClient + .getContainerReplicas(currentKeyContainerID).size() == 5); } catch (IOException exception) { Assert.fail("Unexpected exception " + exception); return false; From d2343c0837e3697c5189667ac16673ca0cc461e9 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 30 Jun 2022 23:25:53 -0700 Subject: [PATCH 6/8] Fixed the checkstyle --- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 9 +++------ .../hadoop/hdds/scm/storage/ECBlockOutputStream.java | 2 -- .../ozone/container/common/impl/HddsDispatcher.java | 9 +++------ .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 3 --- .../hadoop/ozone/client/io/ECBlockOutputStreamEntry.java | 1 - .../apache/hadoop/ozone/client/MockXceiverClientSpi.java | 4 ---- 6 files changed, 6 insertions(+), 22 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 635fab978c7c..14071b975434 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -143,12 +143,9 @@ public BlockOutputStream( replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = BlockData.newBuilder().setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(blockID.getContainerID()) - .setLocalID(blockID.getLocalID()) - .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) - .setReplicaIndex(replicationIndex).build()).addMetadata(keyValue); + this.containerBlockData = + BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .addMetadata(keyValue); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index d0afa43d7ef2..9f6bf059506c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -52,7 +52,6 @@ public class ECBlockOutputStream extends BlockOutputStream { private CompletableFuture putBlkRspFuture = null; - private int replicaIndex = 0; /** * Creates a new ECBlockOutputStream. * @@ -73,7 +72,6 @@ public ECBlockOutputStream( pipeline, bufferPool, config, token); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); - this.replicaIndex = pipeline.getReplicaIndex(this.datanodeDetails); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 8f2af71ead61..60bb8e59bf89 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -247,9 +247,6 @@ private ContainerCommandResponseProto dispatchRequest( } if (cmdType != Type.CreateContainer) { - int replicaIndexFromPutBlock = msg.hasPutBlock() ? - msg.getPutBlock().getBlockData().getBlockID().getReplicaIndex() : - 0; /** * Create Container should happen only as part of Write_Data phase of * writeChunk. @@ -260,7 +257,7 @@ private ContainerCommandResponseProto dispatchRequest( */ if (container == null && ((isWriteStage || isCombinedStage) || cmdType == Type.PutSmallFile - || (cmdType == Type.PutBlock && replicaIndexFromPutBlock > 0))) { + || cmdType == Type.PutBlock)) { // If container does not exist, create one for WriteChunk and // PutSmallFile request responseProto = createContainer(msg); @@ -272,8 +269,8 @@ private ContainerCommandResponseProto dispatchRequest( return ContainerUtils.logAndReturnError(LOG, sce, msg); } Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null - || dispatcherContext == null || - (cmdType == Type.PutBlock && replicaIndexFromPutBlock > 0)); + || dispatcherContext == null + || cmdType == Type.PutBlock); if (container2BCSIDMap != null) { // adds this container to list of containers created in the pipeline // with initial BCSID recorded as 0. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index aeb32a9dbb31..1c79683326e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -84,7 +84,6 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER; @@ -95,14 +94,12 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; -import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getContainerCommandResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; -import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 89f09802da23..77e8a01ae296 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java index d94181abb428..49eee44f6e33 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java @@ -94,10 +94,6 @@ public XceiverClientReply sendCommandAsync( return r.setResult(Result.IO_EXCEPTION); } }); - case CreateContainer: - return result(request, - r -> r.setCreateContainer( - ContainerProtos.CreateContainerResponseProto.newBuilder().build())); case ReadChunk: return result(request, r -> r.setReadChunk(readChunk(request.getReadChunk()))); From 6a2c9724293e9819a11f9739a2b34e277f849e17 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Fri, 1 Jul 2022 19:16:28 -0700 Subject: [PATCH 7/8] Fixed the issue of replication indexes not present in putBlock flow. --- .../hadoop/hdds/scm/storage/BlockInputStream.java | 11 ++++++++--- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 9 ++++++--- .../hadoop/ozone/client/io/ECBlockInputStream.java | 4 +++- .../algorithms/SCMContainerPlacementRackScatter.java | 2 ++ .../replication/ECContainerReplicaCount.java | 3 ++- .../apache/hadoop/ozone/client/TestOzoneECClient.java | 4 +++- .../hdds/scm/storage/TestContainerCommandsEC.java | 7 +++++-- 7 files changed, 29 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 07a444a2486e..e042168850cf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -228,9 +229,13 @@ protected List getChunkInfos() throws IOException { LOG.debug("Initializing BlockInputStream for get key to access {}", blockID.getContainerID()); } - - DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); + DatanodeBlockID datanodeBlockID = + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(blockID.getContainerID()) + .setLocalID(blockID.getLocalID()) + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) + .setReplicaIndex( + pipeline.getReplicaIndex(pipeline.getClosestNode())).build(); GetBlockResponseProto response = ContainerProtocolCalls .getBlock(xceiverClient, datanodeBlockID, token); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 14071b975434..635fab978c7c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -143,9 +143,12 @@ public BlockOutputStream( replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = - BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); + this.containerBlockData = BlockData.newBuilder().setBlockID( + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(blockID.getContainerID()) + .setLocalID(blockID.getLocalID()) + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) + .setReplicaIndex(replicationIndex).build()).addMetadata(keyValue); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index 89be7839ac0f..40d454a0a19b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -167,7 +168,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { .setReplicationConfig(StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE)) .setNodes(Arrays.asList(dataLocations[locationIndex])) - .setId(PipelineID.randomId()) + .setId(PipelineID.randomId()).setReplicaIndexes( + ImmutableMap.of(dataLocations[locationIndex], locationIndex + 1)) .setState(Pipeline.PipelineState.CLOSED) .build(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java index 235bcd59658a..104109244739 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java @@ -107,6 +107,8 @@ public List chooseDatanodes( int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size(); List availableNodes = networkTopology.getNodes( networkTopology.getMaxLevel()); + + int totalNodesCount = availableNodes.size(); if (excludedNodes != null) { availableNodes.removeAll(excludedNodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java index ff0a75053a32..f62afc8b777e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java @@ -423,7 +423,8 @@ private void ensureIndexWithinBounds(Integer index, String setName) { if (index < 1 || index > repConfig.getRequiredNodes()) { throw new IllegalArgumentException("Replica Index in " + setName + " for containerID " + containerInfo.getContainerID() - + "must be between 1 and " + repConfig.getRequiredNodes()); + + "must be between 1 and " + repConfig.getRequiredNodes() + + ". But the given index is: " + index); } } } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java index 98f014a7b14c..5003f8e5e632 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java @@ -384,7 +384,9 @@ public void testPutBlockHasBlockGroupLen() throws IOException { keyDetails.getOzoneKeyLocations().get(0).getContainerID()) .setLocalID( keyDetails.getOzoneKeyLocations().get(0).getLocalID()) - .setBlockCommitSequenceId(1).build()); + .setBlockCommitSequenceId(1).setReplicaIndex( + blockList.getKeyLocations(0).getPipeline() + .getMemberReplicaIndexes(i)).build()); List metadataList = block.getMetadataList().stream().filter(kv -> kv.getKey() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index e57ec400cc9d..0b037af8c162 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -230,9 +230,12 @@ public void testListBlock() throws Exception { continue; } ListBlockResponseProto response = ContainerProtocolCalls - .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1, + .listBlock(clients.get(i), containerID, null, Integer.MAX_VALUE, containerToken); - Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(), + Assertions.assertEquals(numExpectedBlocks, + response.getBlockDataList().stream().filter( + k -> k.getChunksCount() > 0 && k.getChunks(0).getLen() > 0) + .collect(Collectors.toList()).size(), "blocks count doesn't match on DN " + i); Assertions.assertEquals(numExpectedChunks, response.getBlockDataList().stream() From e26dd7ec05a73e7847fd785d6e1e5d12617205dd Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Sun, 3 Jul 2022 22:48:17 -0700 Subject: [PATCH 8/8] Some additional cleaups. --- .../hdds/scm/storage/BlockInputStream.java | 18 ++++++++++-------- .../hdds/scm/storage/BlockOutputStream.java | 11 ++++++++--- .../SCMContainerPlacementRackScatter.java | 2 -- .../client/io/ECBlockOutputStreamEntry.java | 9 +++++---- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index e042168850cf..a0a210dd5857 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -229,15 +228,18 @@ protected List getChunkInfos() throws IOException { LOG.debug("Initializing BlockInputStream for get key to access {}", blockID.getContainerID()); } - DatanodeBlockID datanodeBlockID = - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(blockID.getContainerID()) + + DatanodeBlockID.Builder blkIDBuilder = + DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID()) .setLocalID(blockID.getLocalID()) - .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) - .setReplicaIndex( - pipeline.getReplicaIndex(pipeline.getClosestNode())).build(); + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); + + int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); + if (replicaIndex > 0) { + blkIDBuilder.setReplicaIndex(replicaIndex); + } GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, token); + .getBlock(xceiverClient, blkIDBuilder.build(), token); chunks = response.getBlockData().getChunksList(); success = true; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 635fab978c7c..036b1007d7b6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -143,12 +143,17 @@ public BlockOutputStream( replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = BlockData.newBuilder().setBlockID( + + ContainerProtos.DatanodeBlockID.Builder blkIDBuilder = ContainerProtos.DatanodeBlockID.newBuilder() .setContainerID(blockID.getContainerID()) .setLocalID(blockID.getLocalID()) - .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()) - .setReplicaIndex(replicationIndex).build()).addMetadata(keyValue); + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); + if (replicationIndex > 0) { + blkIDBuilder.setReplicaIndex(replicationIndex); + } + this.containerBlockData = BlockData.newBuilder().setBlockID( + blkIDBuilder.build()).addMetadata(keyValue); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java index 104109244739..235bcd59658a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java @@ -107,8 +107,6 @@ public List chooseDatanodes( int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size(); List availableNodes = networkTopology.getNodes( networkTopology.getMaxLevel()); - - int totalNodesCount = availableNodes.size(); if (excludedNodes != null) { availableNodes.removeAll(excludedNodes); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 77e8a01ae296..13c6a8647ac7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -92,11 +92,10 @@ void checkStream() throws IOException { new ECBlockOutputStream[replicationConfig.getRequiredNodes()]; for (int i = currentStreamIdx; i < replicationConfig .getRequiredNodes(); i++) { - Pipeline ecPipeline = getPipeline(); List nodes = getPipeline().getNodes(); blockOutputStreams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), - createSingleECBlockPipeline(ecPipeline, nodes.get(i), i + 1), + createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), getBufferPool(), getConf(), getToken()); } } @@ -310,9 +309,11 @@ private List getFailedStreams(boolean forPutBlock) { List failedStreams = new ArrayList<>(); while (iter.hasNext()) { final ECBlockOutputStream stream = iter.next(); - if (stream.getWrittenDataLength() <= 0) { + if (!forPutBlock && stream.getWrittenDataLength() <= 0) { // If we did not write any data to this stream yet, let's not consider - // for failure checking. + // for failure checking. But we should do failure checking for putBlock + // though. In the case of padding stripes, we do send empty put blocks + // for creating empty containers at DNs ( Refer: HDDS-6794). continue; } CompletableFuture