diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index adecc3e4c1e2..3c545b776ff9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -115,17 +115,28 @@ ContainerCommandResponseProto> executePutBlock(boolean close, } BlockData checksumBlockData = null; + BlockID blockID = null; //Reverse Traversal as all parity will have checksumBytes for (int i = blockData.length - 1; i >= 0; i--) { BlockData bd = blockData[i]; if (bd == null) { continue; } + if (blockID == null) { + // store the BlockID for logging + blockID = bd.getBlockID(); + } List chunks = bd.getChunks(); - if (chunks != null && chunks.size() > 0 && chunks.get(0) - .hasStripeChecksum()) { - checksumBlockData = bd; - break; + if (chunks != null && chunks.size() > 0) { + if (chunks.get(0).hasStripeChecksum()) { + checksumBlockData = bd; + break; + } else { + ChunkInfo chunk = chunks.get(0); + LOG.debug("The first chunk in block with index {} does not have stripeChecksum. BlockID: {}, Block " + + "size: {}. Chunk length: {}, Chunk offset: {}, hasChecksumData: {}, chunks size: {}.", i, + bd.getBlockID(), bd.getSize(), chunk.getLen(), chunk.getOffset(), chunk.hasChecksumData(), chunks.size()); + } } } @@ -158,9 +169,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close, getContainerBlockData().clearChunks(); getContainerBlockData().addAllChunks(newChunkList); } else { - throw new IOException("None of the block data have checksum " + - "which means " + parity + "(parity)+1 blocks are " + - "not present"); + LOG.warn("Could not find checksum data in any index for blockData with BlockID {}, length {} and " + + "blockGroupLength {}.", blockID, blockData.length, blockGroupLength); } return executePutBlock(close, force, blockGroupLength); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index d06c9cf684f4..bf4830c6fcb5 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -23,9 +23,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -42,8 +46,13 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -83,6 +92,66 @@ void test(final int writeSize) throws IOException { } } + /** + * Tests an EC offline reconstruction scenario in which none of the ChunkInfo in an EC stripe have stripeChecksum. + * Such ChunkInfo will exist for any EC data that was written in a version in which the ChunkInfo protobuf message did + * not have the stripeChecksum field. Here, we assert that executePutBlock during reconstruction does not throw an + * exception because of missing stripeChecksum. This essentially tests compatibility between an Ozone version that + * did not have stripeChecksum and a version that has stripeChecksum. + */ + @Test + public void testMissingStripeChecksumDoesNotMakeExecutePutBlockFailDuringECReconstruction() throws IOException { + // setup some parameters required for creating ECBlockOutputStream + OzoneClientConfig config = new OzoneClientConfig(); + ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2); + BlockID blockID = new BlockID(1, 1); + DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails(); + Pipeline pipeline = Pipeline.newBuilder() + .setId(PipelineID.valueOf(datanodeDetails.getUuid())) + .setReplicationConfig(replicationConfig) + .setNodes(ImmutableList.of(datanodeDetails)) + .setState(Pipeline.PipelineState.CLOSED) + // we'll executePutBlock for the parity index 5 because stripeChecksum is written to either the first or the + // parity indexes + .setReplicaIndexes(ImmutableMap.of(datanodeDetails, 5)).build(); + + BlockLocationInfo locationInfo = new BlockLocationInfo.Builder() + .setBlockID(blockID) + .setOffset(1) + .setLength(10) + .setPipeline(pipeline).build(); + + /* + The array of BlockData contains metadata about blocks and their chunks, and is read in executePutBlock. In + this test, we deliberately don't write stripeChecksum to any chunk. The expectation is that executePutBlock + should not throw an exception because of missing stripeChecksum. + */ + BlockData[] blockData = createBlockDataWithoutStripeChecksum(blockID, replicationConfig); + try (ECBlockOutputStream ecBlockOutputStream = createECBlockOutputStream(config, replicationConfig, blockID, + pipeline)) { + Assertions.assertDoesNotThrow(() -> ecBlockOutputStream.executePutBlock(true, true, locationInfo.getLength(), + blockData)); + } + } + + /** + * Creates a BlockData array with {@link ECReplicationConfig#getRequiredNodes()} number of elements. + */ + private BlockData[] createBlockDataWithoutStripeChecksum(BlockID blockID, ECReplicationConfig replicationConfig) { + int requiredNodes = replicationConfig.getRequiredNodes(); + BlockData[] blockDataArray = new BlockData[requiredNodes]; + + // add just one ChunkInfo to each BlockData. + for (int i = 0; i < requiredNodes; i++) { + BlockData data = new BlockData(blockID); + // create a ChunkInfo with no stripeChecksum + ChunkInfo chunkInfo = new ChunkInfo("abc", 0, 10); + data.addChunk(chunkInfo.getProtoBufMessage()); + blockDataArray[i] = data; + } + return blockDataArray; + } + private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) throws IOException { @@ -114,6 +183,20 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) () -> newFixedThreadPool(10)); } + private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientConfig, + ECReplicationConfig repConfig, BlockID blockID, Pipeline pipeline) throws IOException { + final XceiverClientManager xcm = mock(XceiverClientManager.class); + when(xcm.acquireClient(any())) + .thenReturn(new MockXceiverClientSpi(pipeline)); + + ContainerClientMetrics clientMetrics = ContainerClientMetrics.acquire(); + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, clientConfig); + + return new ECBlockOutputStream(blockID, xcm, pipeline, BufferPool.empty(), clientConfig, null, + clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2)); + } + /** * XCeiverClient which simulates responses. */