From 6f8f4478290c4f894a15f622f1b7381448fe2183 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 29 Oct 2024 11:50:22 +0530 Subject: [PATCH 1/7] HDDS-11475. Validate EC reconstruction on DN --- .../hadoop/hdds/scm/OzoneClientConfig.java | 16 ++++++++++++++++ .../ECReconstructionCoordinator.java | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 5426bbc49817..965dfe9b6e77 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -225,6 +225,14 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private int ecReconstructStripeWritePoolLimit = 10 * 3; + @Config(key="ec.reconstruction.validation", + defaultValue = "false", + description = "Flag to enable validation for EC reconstruction tasks" + + " to reconstruct target containers correctly. Reconstruction tasks" + + " will fail if validation fails when enabled.", + tags = ConfigTag.CLIENT) + private boolean ecReconstructionValidation = false; + @Config(key = "checksum.combine.mode", defaultValue = "COMPOSITE_CRC", description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] " @@ -509,6 +517,14 @@ public int getEcReconstructStripeWritePoolLimit() { return ecReconstructStripeWritePoolLimit; } + public void setEcReconstructionValidation(boolean validationEnabled) { + this.ecReconstructionValidation = validationEnabled; + } + + public boolean getEcReconstructionValidation() { + return ecReconstructionValidation; + } + public void setFsDefaultBucketLayout(String bucketLayout) { if (!bucketLayout.isEmpty()) { this.fsDefaultBucketLayout = bucketLayout; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 4273ed2b7163..e87c5ad86a45 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.security.token.Token; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,6 +274,8 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, ECBlockOutputStream[] emptyBlockStreams = new ECBlockOutputStream[notReconstructIndexes.size()]; ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()]; + List checksums = new ArrayList<>(toReconstructIndexes.size()); + try { // Create streams and buffers for all indexes that need reconstructed for (int i = 0; i < toReconstructIndexes.size(); i++) { @@ -328,6 +331,8 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, CompletableFuture future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); + // Store the recreated checksum + } bufs[i].clear(); } From 8322bbb36f41edc69ba85031991ec7f5a58b2ca7 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 5 Nov 2024 14:03:12 +0530 Subject: [PATCH 2/7] Initial ECValidator --- ...ECBlockReconstructedStripeInputStream.java | 4 +++ .../ECReconstructionCoordinator.java | 7 ++-- .../ec/reconstruction/ECValidator.java | 34 +++++++++++++++++++ 3 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java index 229cc3f3e36e..06443291dade 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java @@ -826,4 +826,8 @@ private static SortedSet setOfRange( .boxed().collect(toCollection(TreeSet::new)); } + public boolean validateChecksum(ByteBuffer[] buf) { + + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index e87c5ad86a45..16bf512be5bf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -46,10 +46,10 @@ import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy; import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream; +import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,8 +274,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, ECBlockOutputStream[] emptyBlockStreams = new ECBlockOutputStream[notReconstructIndexes.size()]; ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()]; - List checksums = new ArrayList<>(toReconstructIndexes.size()); - try { // Create streams and buffers for all indexes that need reconstructed for (int i = 0; i < toReconstructIndexes.size(); i++) { @@ -302,6 +300,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, try { readLen = sis.recoverChunks(bufs); Set failedIndexes = sis.getFailedIndexes(); + if (!failedIndexes.isEmpty()) { // There was a problem reading some of the block indexes, but we // did not get an exception as there must have been spare indexes @@ -331,8 +330,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, CompletableFuture future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); - // Store the recreated checksum - } bufs[i].clear(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java new file mode 100644 index 000000000000..72784529aa10 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.ozone.container.ec.reconstruction; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +public class ECValidator { + + private static final Logger LOG = + LoggerFactory.getLogger(ECValidator.class); + private Checksum checksum = null; + + ECValidator(OzoneClientConfig config) { + this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + } + + public boolean validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream) + throws OzoneChecksumException{ + try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) { + //Checksum will be stored in the 1st chunk and parity chunks + ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData() + .getChunks(0).getChecksumData(); + ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage(); + LOG.info("Chunk Checksum: {}, Stripe Checksum: {}", chunkChecksum, stripeChecksum); + } + } +} From e7eba94b311ab66a3f587310a8c833e42ac4f1ee Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 5 Nov 2024 23:32:56 +0530 Subject: [PATCH 3/7] Added intial validator call --- .../ec/reconstruction/ECReconstructionCoordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 16bf512be5bf..1adbf031cc68 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -300,7 +300,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, try { readLen = sis.recoverChunks(bufs); Set failedIndexes = sis.getFailedIndexes(); - if (!failedIndexes.isEmpty()) { // There was a problem reading some of the block indexes, but we // did not get an exception as there must have been spare indexes @@ -331,6 +330,8 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); } + ECValidator validator = new ECValidator(ozoneClientConfig); + validator.validateBuffer(bufs[i], targetBlockStreams[i]); bufs[i].clear(); } length -= readLen; From 5516cce9346d7d0106536a0dc16b9ade25512ebc Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Thu, 7 Nov 2024 03:10:06 +0530 Subject: [PATCH 4/7] Implement the validator --- ...ECBlockReconstructedStripeInputStream.java | 5 --- .../ECReconstructionCoordinator.java | 5 +-- .../ec/reconstruction/ECValidator.java | 34 +++++++++++++++---- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java index 06443291dade..204bc2396e4b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java @@ -825,9 +825,4 @@ private static SortedSet setOfRange( return range(startInclusive, endExclusive) .boxed().collect(toCollection(TreeSet::new)); } - - public boolean validateChecksum(ByteBuffer[] buf) { - - } - } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 1adbf031cc68..6dde8a744c80 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -116,6 +116,7 @@ public class ECReconstructionCoordinator implements Closeable { private final ECReconstructionMetrics metrics; private final StateContext context; private final OzoneClientConfig ozoneClientConfig; + private final ECValidator ecValidator; public ECReconstructionCoordinator( ConfigurationSource conf, CertificateClient certificateClient, @@ -141,6 +142,7 @@ public ECReconstructionCoordinator( tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); this.clientMetrics = ContainerClientMetrics.acquire(); this.metrics = metrics; + ecValidator = new ECValidator(ozoneClientConfig); } public void reconstructECContainerGroup(long containerID, @@ -330,8 +332,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); } - ECValidator validator = new ECValidator(ozoneClientConfig); - validator.validateBuffer(bufs[i], targetBlockStreams[i]); + ecValidator.validateBuffer(bufs[i], targetBlockStreams[i], i); bufs[i].clear(); } length -= readLen; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java index 72784529aa10..40b6d395186e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -4,8 +4,10 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,19 +18,37 @@ public class ECValidator { private static final Logger LOG = LoggerFactory.getLogger(ECValidator.class); private Checksum checksum = null; + private final boolean isValidationEnabled; ECValidator(OzoneClientConfig config) { this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + // We fetch the configuration value beforehand to avoid re-fetching on every validation call + isValidationEnabled = config.getEcReconstructionValidation(); } - public boolean validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream) + /** + * Helper function to validate the checksum between recreated data and + * @param buf A {@link ByteBuffer} instance that stores the chunk + * @param ecBlockOutputStream A {@link ECBlockOutputStream} instance that stores + * the reconstructed index ECBlockOutputStream + * @param idx Used to store the index at which data was recreated + * @throws OzoneChecksumException if the recreated checksum and the block checksum doesn't match + */ + public void validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream, int idx) throws OzoneChecksumException{ - try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) { - //Checksum will be stored in the 1st chunk and parity chunks - ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData() - .getChunks(0).getChecksumData(); - ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage(); - LOG.info("Chunk Checksum: {}, Stripe Checksum: {}", chunkChecksum, stripeChecksum); + if (isValidationEnabled) { + try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) { + //Checksum will be stored in the 1st chunk and parity chunks + ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData() + .getChunks(0).getChecksumData(); + ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage(); + if (stripeChecksum.getChecksums(idx) != chunkChecksum.getChecksums(0)) { + LOG.info("Checksum mismatched between recreated chunk and re-created chunk"); + throw new OzoneChecksumException("Inconsistent checksum for re-created chunk and original chunk"); + } + } + } else { + LOG.debug("Checksum validation was disabled, skipping check"); } } } From db7b00d84c0e1eeea9e99de6aadd5de7185d942f Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Thu, 14 Nov 2024 14:01:26 +0530 Subject: [PATCH 5/7] Todo reconstructor --- .../ECReconstructionCoordinator.java | 9 ++- .../ec/reconstruction/ECValidator.java | 78 +++++++++++++++---- 2 files changed, 69 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 6dde8a744c80..723ac81281d1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -270,6 +270,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, this.blockInputStreamFactory, byteBufferPool, this.ecReconstructReadExecutor, clientConfig)) { + ecValidator.setBlockLength(blockLocationInfo.getLength()); ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; @@ -294,8 +295,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, } if (toReconstructIndexes.size() > 0) { - sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1)) - .collect(Collectors.toSet())); + Set recoveryIndexes = toReconstructIndexes.stream().map(i -> (i - 1)) + .collect(Collectors.toSet()); + sis.setRecoveryIndexes(recoveryIndexes); + ecValidator.setReconstructionIndexes(recoveryIndexes); long length = safeBlockGroupLength; while (length > 0) { int readLen; @@ -332,7 +335,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); } - ecValidator.validateBuffer(bufs[i], targetBlockStreams[i], i); bufs[i].clear(); } length -= readLen; @@ -341,6 +343,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, List allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams)); allStreams.addAll(Arrays.asList(emptyBlockStreams)); for (ECBlockOutputStream targetStream : allStreams) { + ecValidator.validateBuffer(targetStream); targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup); checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java index 40b6d395186e..f3453a84d117 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -1,5 +1,6 @@ package org.apache.hadoop.ozone.container.ec.reconstruction; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; @@ -7,46 +8,93 @@ import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; public class ECValidator { private static final Logger LOG = LoggerFactory.getLogger(ECValidator.class); - private Checksum checksum = null; private final boolean isValidationEnabled; + private Collection reconstructionIndexes; + private final int parityCount; + private long blockLength; + private final ECReplicationConfig ecReplicationConfig; - ECValidator(OzoneClientConfig config) { - this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + ECValidator(OzoneClientConfig config, ECReplicationConfig ecReplConfig) { // We fetch the configuration value beforehand to avoid re-fetching on every validation call isValidationEnabled = config.getEcReconstructionValidation(); + ecReplicationConfig = ecReplConfig; + parityCount = ecReplConfig.getParity(); + } + + public void setReconstructionIndexes(Collection reconstructionIndexes) { + this.reconstructionIndexes = reconstructionIndexes; + } + + public void setBlockLength(long blockLength) { + this.blockLength = blockLength; + } + + private boolean validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, ByteString stripeChecksum) + throws OzoneChecksumException{ + + int bytesPerChecksum = checksumData.getBytesPerChecksum(); + ByteString checksum = stripeChecksum.substring(); + } + + private BlockData getChecksumBlockData(BlockData[] blockDataGroup) { + BlockData checksumBlockData = null; + // Reverse traversal as all parity bits will have checmsumBytes + for (int i = blockDataGroup.length - 1; i >= 0; i--) { + BlockData blockData = blockDataGroup[i]; + if (null == blockData) { + continue; + } + + List chunks = blockData.getChunks(); + if (null != chunks && !(chunks.isEmpty())) { + if (chunks.get(0).hasStripeChecksum()) { + checksumBlockData = blockData; + break; + } + } + } + + return checksumBlockData; } /** * Helper function to validate the checksum between recreated data and - * @param buf A {@link ByteBuffer} instance that stores the chunk * @param ecBlockOutputStream A {@link ECBlockOutputStream} instance that stores * the reconstructed index ECBlockOutputStream - * @param idx Used to store the index at which data was recreated * @throws OzoneChecksumException if the recreated checksum and the block checksum doesn't match */ - public void validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream, int idx) + public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[] blockDataGroup) throws OzoneChecksumException{ if (isValidationEnabled) { - try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) { - //Checksum will be stored in the 1st chunk and parity chunks - ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData() - .getChunks(0).getChecksumData(); - ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage(); - if (stripeChecksum.getChecksums(idx) != chunkChecksum.getChecksums(0)) { - LOG.info("Checksum mismatched between recreated chunk and re-created chunk"); - throw new OzoneChecksumException("Inconsistent checksum for re-created chunk and original chunk"); - } + + //Checksum will be stored in the 1st chunk and parity chunks + List recreatedChunks = ecBlockOutputStream.getContainerBlockData().getChunksList(); + BlockData checksumBlockData = getChecksumBlockData(blockDataGroup); + if (null == checksumBlockData) { + throw new OzoneChecksumException("Could not find checksum data in any index for blockDataGroup while validating"); + } + List checksumBlockChunks = checksumBlockData.getChunks(); + + for (int i = 0; i < recreatedChunks.size(); i++) { + validateChecksumInStripe(recreatedChunks.get(i).getChecksumData(), checksumBlockChunks.get(i).getStripeChecksum()); } + + } else { LOG.debug("Checksum validation was disabled, skipping check"); } From 21146d32d932c84afd568485fd820277fdbf0518 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Mon, 18 Nov 2024 13:26:24 +0530 Subject: [PATCH 6/7] Final Validator implementation --- .../ECReconstructionCoordinator.java | 2 +- .../ec/reconstruction/ECValidator.java | 26 ++++++++++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 723ac81281d1..4af70445152c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -343,7 +343,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, List allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams)); allStreams.addAll(Arrays.asList(emptyBlockStreams)); for (ECBlockOutputStream targetStream : allStreams) { - ecValidator.validateBuffer(targetStream); + ecValidator.validateChecksum(targetStream, blockDataGroup); targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup); checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java index f3453a84d117..1415b6fc38f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -44,16 +44,27 @@ public void setBlockLength(long blockLength) { this.blockLength = blockLength; } - private boolean validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, ByteString stripeChecksum) - throws OzoneChecksumException{ + private void validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, + ByteString stripeChecksum, int chunkIndex) + throws OzoneChecksumException { + // If we have say 100 bytes per checksum, in the stripe the first 100 bytes should + // correspond to the fist chunk checksum, next 100 should be the second chunk checksum + // and so on. So the checksum should range from (numOfBytes * index of chunk) to ((numOfBytes * index of chunk) + numOfBytes) int bytesPerChecksum = checksumData.getBytesPerChecksum(); - ByteString checksum = stripeChecksum.substring(); + + int checksumIdxStart = (bytesPerChecksum * chunkIndex); + ByteString expectedChecksum = stripeChecksum.substring(checksumIdxStart, + (checksumIdxStart + bytesPerChecksum)); + if (!checksumData.getChecksums(0).equals(expectedChecksum)) { + throw new OzoneChecksumException(String.format("Mismatch in checksum for recreated data: %s and existing stripe checksum: %s", + checksumData.getChecksums(0), expectedChecksum)); + } } private BlockData getChecksumBlockData(BlockData[] blockDataGroup) { BlockData checksumBlockData = null; - // Reverse traversal as all parity bits will have checmsumBytes + // Reverse traversal as all parity bits will have checksumBytes for (int i = blockDataGroup.length - 1; i >= 0; i--) { BlockData blockData = blockDataGroup[i]; if (null == blockData) { @@ -91,10 +102,11 @@ public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[ List checksumBlockChunks = checksumBlockData.getChunks(); for (int i = 0; i < recreatedChunks.size(); i++) { - validateChecksumInStripe(recreatedChunks.get(i).getChecksumData(), checksumBlockChunks.get(i).getStripeChecksum()); + validateChecksumInStripe( + recreatedChunks.get(i).getChecksumData(), + checksumBlockChunks.get(i).getStripeChecksum(), i + ); } - - } else { LOG.debug("Checksum validation was disabled, skipping check"); } From f4b2ceab5be902e07549dd98e509f05e50666dc0 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Sun, 8 Dec 2024 04:59:06 +0530 Subject: [PATCH 7/7] Fixed validator implementation for comparison of checksums --- .../ec/reconstruction/ECValidator.java | 70 ++++++++++++++----- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java index 1415b6fc38f6..9f52ca6b93ac 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; @@ -28,12 +29,14 @@ public class ECValidator { private final int parityCount; private long blockLength; private final ECReplicationConfig ecReplicationConfig; + private int ecChunkSize; ECValidator(OzoneClientConfig config, ECReplicationConfig ecReplConfig) { // We fetch the configuration value beforehand to avoid re-fetching on every validation call isValidationEnabled = config.getEcReconstructionValidation(); ecReplicationConfig = ecReplConfig; parityCount = ecReplConfig.getParity(); + ecChunkSize = ecReplConfig.getEcChunkSize(); } public void setReconstructionIndexes(Collection reconstructionIndexes) { @@ -44,24 +47,58 @@ public void setBlockLength(long blockLength) { this.blockLength = blockLength; } - private void validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, - ByteString stripeChecksum, int chunkIndex) + /** + * Validate the expected checksum data for a chunk with the corresponding checksum in original stripe checksum + * Note: The stripe checksum is a combination of all the checksums of all the chunks in the stripe + * @param recreatedChunkChecksum Stores the {@link ContainerProtos.ChecksumData} of the recreated chunk to verify + * @param stripeChecksum Stores the {@link ByteBuffer} of stripe checksum + * @param chunkIndex Stores the index of the recreated chunk we are comparing + * @param checksumSize Stores the length of the stripe checksum + * @throws OzoneChecksumException If there is a mismatch in the recreated chunk vs stripe checksum, or if there is any + * internal error while performing {@link ByteBuffer} operations + */ + private void validateChecksumInStripe(ContainerProtos.ChecksumData recreatedChunkChecksum, + ByteBuffer stripeChecksum, int chunkIndex, int checksumSize) throws OzoneChecksumException { - // If we have say 100 bytes per checksum, in the stripe the first 100 bytes should - // correspond to the fist chunk checksum, next 100 should be the second chunk checksum - // and so on. So the checksum should range from (numOfBytes * index of chunk) to ((numOfBytes * index of chunk) + numOfBytes) - int bytesPerChecksum = checksumData.getBytesPerChecksum(); - - int checksumIdxStart = (bytesPerChecksum * chunkIndex); - ByteString expectedChecksum = stripeChecksum.substring(checksumIdxStart, - (checksumIdxStart + bytesPerChecksum)); - if (!checksumData.getChecksums(0).equals(expectedChecksum)) { - throw new OzoneChecksumException(String.format("Mismatch in checksum for recreated data: %s and existing stripe checksum: %s", - checksumData.getChecksums(0), expectedChecksum)); + int bytesPerChecksum = recreatedChunkChecksum.getBytesPerChecksum(); + int parityLength = (int) (Math.ceil((double)ecChunkSize / bytesPerChecksum) * 4L * parityCount); + // Ignore the parity bits + stripeChecksum.limit(checksumSize - parityLength); + + // If we have a 100 bytes per checksum, and a chunk of size 1000 bytes, it means there are total 10 checksums + // for each chunk that is present. So the 1st chunk will have 10 checksums together to form a single chunk checksum. + // For each chunk we will have: + // Checksum of length = (chunkIdx * numOfChecksumPerChunk) + // Number of Checksums per Chunk = (chunkSize / bytesPerChecksum) + // So the checksum should start from (numOfBytesPerChecksum * (chunkIdx * numOfChecksumPerChunk) + + int checksumIdxStart = (ecChunkSize * chunkIndex); + + stripeChecksum.position(checksumIdxStart); + ByteBuffer chunkChecksum = recreatedChunkChecksum.getChecksums(0).asReadOnlyByteBuffer(); + while (chunkChecksum.hasRemaining()) { + try { + int recreatedChunkChecksumByte = chunkChecksum.getInt(); + int expectedStripeChecksumByte = stripeChecksum.getInt(); + if (recreatedChunkChecksumByte != expectedStripeChecksumByte) { + throw new OzoneChecksumException( + String.format("Mismatch in checksum for recreated data: %s and existing stripe checksum: %s", + recreatedChunkChecksumByte, expectedStripeChecksumByte)); + } + } catch (BufferUnderflowException bue) { + throw new OzoneChecksumException( + String.format("No more data to fetch from the stripe checksum at position: %s", + stripeChecksum.position())); + } } } + /** + * Get the block from the BlockData which contains the checksum information + * @param blockDataGroup An array of {@link BlockData} which contains all the blocks in a Datanode + * @return The block which contains the checksum information + */ private BlockData getChecksumBlockData(BlockData[] blockDataGroup) { BlockData checksumBlockData = null; // Reverse traversal as all parity bits will have checksumBytes @@ -101,10 +138,11 @@ public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[ } List checksumBlockChunks = checksumBlockData.getChunks(); - for (int i = 0; i < recreatedChunks.size(); i++) { + for (int chunkIdx = 0; chunkIdx < recreatedChunks.size(); chunkIdx++) { + ByteString stripeChecksum = checksumBlockChunks.get(chunkIdx).getStripeChecksum(); validateChecksumInStripe( - recreatedChunks.get(i).getChecksumData(), - checksumBlockChunks.get(i).getStripeChecksum(), i + recreatedChunks.get(chunkIdx).getChecksumData(), + stripeChecksum.asReadOnlyByteBuffer(), stripeChecksum.size(), chunkIdx ); } } else {