From 0a08fb1b5cebbc0c79c1902b0284e4a4ec400e59 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 11 Sep 2024 16:17:54 -0700 Subject: [PATCH 01/12] HDDS-10411. Support incremental ChunkBuffer checksum calculation --- .../hdds/scm/storage/BlockOutputStream.java | 1 + .../apache/hadoop/ozone/common/Checksum.java | 62 ++++++++-- .../hadoop/ozone/common/ChecksumCache.java | 115 ++++++++++++++++++ .../hadoop/ozone/common/TestChecksum.java | 2 +- 4 files changed, 171 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 59795dd0f051..e06ba8912965 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -1040,6 +1040,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); + checksum.clearChecksumCache(); // New chunk, clear the checksum cache appendLastChunkBuffer(chunk, remainingBufferSize, chunk.remaining() - remainingBufferSize); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index f8b3febfeca8..597b0eb966fa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -33,6 +33,8 @@ import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to compute and verify checksums for chunks. @@ -40,6 +42,8 @@ * This class is not thread safe. */ public class Checksum { + public static final Logger LOG = LoggerFactory.getLogger(Checksum.class); + private static Function newMessageDigestFunction( String algorithm) { final MessageDigest md; @@ -63,7 +67,7 @@ public static ByteString int2ByteString(int n) { private static Function newChecksumByteBufferFunction( Supplier constructor) { final ChecksumByteBuffer algorithm = constructor.get(); - return data -> { + return data -> { algorithm.reset(); algorithm.update(data); return int2ByteString((int)algorithm.getValue()); @@ -97,6 +101,22 @@ Function newChecksumFunction() { private final ChecksumType checksumType; private final int bytesPerChecksum; + /** + * TODO: Make sure to clear this cache when a new block chunk is started. + */ + private final ChecksumCache checksumCache; + + /** + * BlockOutputStream needs to call this method to clear the checksum cache + * whenever a block chunk has been established. + */ + public boolean clearChecksumCache() { + if (checksumCache != null) { + checksumCache.clear(); + return true; + } + return false; + } /** * Constructs a Checksum object. @@ -106,6 +126,24 @@ Function newChecksumFunction() { public Checksum(ChecksumType type, int bytesPerChecksum) { this.checksumType = type; this.bytesPerChecksum = bytesPerChecksum; + this.checksumCache = null; + } + + /** + * Constructs a Checksum object. + * @param type type of Checksum + * @param bytesPerChecksum number of bytes of data per checksum + * @param useChecksumCache true to enable checksum cache + */ + public Checksum(ChecksumType type, int bytesPerChecksum, boolean useChecksumCache) { + this.checksumType = type; + this.bytesPerChecksum = bytesPerChecksum; + LOG.debug("useChecksumCache = {}", useChecksumCache); + if (useChecksumCache) { + this.checksumCache = new ChecksumCache(bytesPerChecksum); + } else { + this.checksumCache = null; + } } /** @@ -168,12 +206,20 @@ public ChecksumData computeChecksum(ChunkBuffer data) throw new OzoneChecksumException(checksumType); } - // Checksum is computed for each bytesPerChecksum number of bytes of data - // starting at offset 0. The last checksum might be computed for the - // remaining data with length less than bytesPerChecksum. - final List checksumList = new ArrayList<>(); - for (ByteBuffer b : data.iterate(bytesPerChecksum)) { - checksumList.add(computeChecksum(b, function, bytesPerChecksum)); + final List checksumList; + if (checksumCache == null) { + // When checksumCache is not enabled: + // Checksum is computed for each bytesPerChecksum number of bytes of data + // starting at offset 0. The last checksum might be computed for the + // remaining data with length less than bytesPerChecksum. + checksumList = new ArrayList<>(); + for (ByteBuffer b : data.iterate(bytesPerChecksum)) { + checksumList.add(computeChecksum(b, function, bytesPerChecksum)); // merge this? + } + } else { + // When checksumCache is enabled: + // We only need to update the last checksum in the cache, then pass it along. + checksumList = checksumCache.computeChecksum(data, function); } return new ChecksumData(checksumType, bytesPerChecksum, checksumList); } @@ -185,7 +231,7 @@ public ChecksumData computeChecksum(ChunkBuffer data) * @param maxLength the max length of data * @return computed checksum ByteString */ - private static ByteString computeChecksum(ByteBuffer data, + protected static ByteString computeChecksum(ByteBuffer data, Function function, int maxLength) { final int limit = data.limit(); try { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java new file mode 100644 index 000000000000..d584a5243ccb --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** + * Cache previous checksums to avoid recomputing them. + * This is a stop-gap solution to reduce checksum calc overhead inside critical section + * without having to do a major refactoring/overhaul over protobuf and interfaces. + * This is only supposed to be used by BlockOutputStream, for now. + *

+ * Each BlockOutputStream has its own Checksum instance. + * Each block chunk (4 MB default) is divided into 16 KB (default) each for checksum calculation. + * For CRC32/CRC32C, each checksum takes 4 bytes. Thus each block chunk has 4 MB / 16 KB * 4 B = 1 KB of checksum data. + */ +public class ChecksumCache { + public static final Logger LOG = LoggerFactory.getLogger(ChecksumCache.class); + + private final int bytesPerChecksum; + private final List checksums; + // Chunk length last time the checksum is computed + private int prevChunkLength; + // This only serves as a hint for array list initial allocation. The array list will still grow as needed. + private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB + + public ChecksumCache(int bytesPerChecksum) { + this.prevChunkLength = 0; + this.bytesPerChecksum = bytesPerChecksum; + // Set initialCapacity to avoid costly resizes + this.checksums = new ArrayList<>(BLOCK_CHUNK_SIZE / bytesPerChecksum); + } + + /** + * Clear cached checksums. And reset the written index. + */ + public void clear() { + prevChunkLength = 0; + checksums.clear(); + } + + public List getChecksums() { + return checksums; + } + + public List computeChecksum(ChunkBuffer data, Function function) { + // Indicates how much data the current chunk buffer holds + final int currChunkLength = data.limit(); + + // Sanity check + if (currChunkLength <= prevChunkLength) { + // If currChunkLength <= lastChunkLength, it indicates a bug that needs to be addressed. + // It means BOS has not properly clear()ed the cache when a new chunk is started in that code path. + throw new IllegalArgumentException("ChunkBuffer data limit must be larger than the last time"); + } + + // One or more checksums need to be computed + + // Start of the checksum index that need to be (re)computed + final int ciStart = prevChunkLength / bytesPerChecksum; + final int ciEnd = currChunkLength / bytesPerChecksum; + int i = 0; + for (ByteBuffer b : data.iterate(bytesPerChecksum)) { + if (i < ciStart) { + i++; + continue; + } + + // i can either point to: + // 1. the last element in the list -- in which case the checksum needs to be updated + // 2. one after the last element -- in which case a new checksum needs to be added + assert i == checksums.size() - 1 || i == checksums.size(); + + // TODO: Furthermore for CRC32/CRC32C, it can be even more efficient by updating the last checksum byte-by-byte. + final ByteString checksum = Checksum.computeChecksum(b, function, bytesPerChecksum); + if (i == checksums.size()) { + checksums.add(checksum); + } else { + checksums.set(i, checksum); + } + + i++; + } + + // Sanity check + if (i - 1 != ciEnd) { + throw new IllegalStateException("Checksum index end does not match expectation"); + } + + // Update last written index + prevChunkLength = currChunkLength; + return checksums; + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java index 829f4bb150c9..a1004ddc438b 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java @@ -39,7 +39,7 @@ private Checksum getChecksum(ContainerProtos.ChecksumType type) { if (type == null) { type = CHECKSUM_TYPE_DEFAULT; } - return new Checksum(type, BYTES_PER_CHECKSUM); + return new Checksum(type, BYTES_PER_CHECKSUM, true); } /** From edb081898dbfb8a2931c3df9fb71594fd19b7d78 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 30 Oct 2024 03:53:54 -0700 Subject: [PATCH 02/12] Use checksum cache on `lastChunkBuffer` checksum calc in PutBlock. --- .../hadoop/hdds/scm/OzoneClientConfig.java | 2 + .../hdds/scm/storage/BlockOutputStream.java | 12 +++++- .../apache/hadoop/ozone/common/Checksum.java | 38 +++++++++++++++---- .../hadoop/ozone/common/TestChecksum.java | 2 + .../container/keyvalue/KeyValueHandler.java | 3 ++ 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 5426bbc49817..8082e233a3a3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -178,6 +178,8 @@ public enum ChecksumCombineMode { tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE }) private int bytesPerChecksum = 16 * 1024; + // TODO: Add block chunk checksum cache client option + @Config(key = "verify.checksum", defaultValue = "true", description = "Ozone client to verify checksum of the checksum " diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index e06ba8912965..c82de28b0612 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -841,6 +841,8 @@ public synchronized void cleanup(boolean invalidateClient) { if (lastChunkBuffer != null) { DIRECT_BUFFER_POOL.returnBuffer(lastChunkBuffer); lastChunkBuffer = null; + // Clear checksum cache + checksum.clearChecksumCache(); } } @@ -890,7 +892,10 @@ private CompletableFuture writeChunkToContainer( final long offset = chunkOffset.getAndAdd(effectiveChunkSize); final ByteString data = chunk.toByteString( bufferPool.byteStringConversion()); - ChecksumData checksumData = checksum.computeChecksum(chunk); + // chunk is incremental, don't cache its checksum + ChecksumData checksumData = checksum.computeChecksum(chunk, false); + // side note: checksum object is shared with PutBlock's (blockData) checksum calc, + // current impl does not support caching both ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) .setOffset(offset) @@ -1041,6 +1046,8 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); checksum.clearChecksumCache(); // New chunk, clear the checksum cache + // TODO: Can make the cache impl a bit more robust by associating ChecksumCache with ChunkBuffer/ByteBuffer rather + // than the Checksum object. appendLastChunkBuffer(chunk, remainingBufferSize, chunk.remaining() - remainingBufferSize); } @@ -1125,7 +1132,8 @@ private ChunkInfo createChunkInfo(long lastPartialChunkOffset) int revisedChunkSize = lastChunkBuffer.remaining(); // create the chunk info to be sent in PutBlock. ChecksumData revisedChecksumData = - checksum.computeChecksum(lastChunkBuffer); + checksum.computeChecksum(lastChunkBuffer, true); + // Cache checksum here. This checksum is stored in blockData (and later transferred in PutBlock) long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index 597b0eb966fa..6639bf933b38 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -133,13 +133,13 @@ public Checksum(ChecksumType type, int bytesPerChecksum) { * Constructs a Checksum object. * @param type type of Checksum * @param bytesPerChecksum number of bytes of data per checksum - * @param useChecksumCache true to enable checksum cache + * @param allowChecksumCache true to enable checksum cache */ - public Checksum(ChecksumType type, int bytesPerChecksum, boolean useChecksumCache) { + public Checksum(ChecksumType type, int bytesPerChecksum, boolean allowChecksumCache) { this.checksumType = type; this.bytesPerChecksum = bytesPerChecksum; - LOG.debug("useChecksumCache = {}", useChecksumCache); - if (useChecksumCache) { + LOG.debug("allowChecksumCache = {}", allowChecksumCache); + if (allowChecksumCache) { this.checksumCache = new ChecksumCache(bytesPerChecksum); } else { this.checksumCache = null; @@ -166,13 +166,25 @@ public ChecksumData computeChecksum(byte[] data) return computeChecksum(ByteBuffer.wrap(data)); } + /** + * The default implementation of computeChecksum(ByteBuffer) that does not use cache, even if cache is initialized. + * This is a stop-gap solution before the protocol change. + * @param data ByteBuffer + * @return ChecksumData + * @throws OzoneChecksumException + */ + public ChecksumData computeChecksum(ByteBuffer data) + throws OzoneChecksumException { + return computeChecksum(data, false); + } + /** * Computes checksum for give data. * @param data input data. * @return ChecksumData computed for input data. * @throws OzoneChecksumException thrown when ChecksumType is not recognized */ - public ChecksumData computeChecksum(ByteBuffer data) + public ChecksumData computeChecksum(ByteBuffer data, boolean useChecksumCache) throws OzoneChecksumException { // If type is set to NONE, we do not need to compute the checksums. We also // need to avoid unnecessary conversions. @@ -182,7 +194,7 @@ public ChecksumData computeChecksum(ByteBuffer data) if (!data.isReadOnly()) { data = data.asReadOnlyBuffer(); } - return computeChecksum(ChunkBuffer.wrap(data)); + return computeChecksum(ChunkBuffer.wrap(data), useChecksumCache); } public ChecksumData computeChecksum(List byteStrings) @@ -192,8 +204,20 @@ public ChecksumData computeChecksum(List byteStrings) return computeChecksum(ChunkBuffer.wrap(buffers)); } + /** + * The default implementation of computeChecksum(ChunkBuffer) that does not use cache, even if cache is initialized. + * This is a stop-gap solution before the protocol change. + * @param data ChunkBuffer + * @return ChecksumData + * @throws OzoneChecksumException + */ public ChecksumData computeChecksum(ChunkBuffer data) throws OzoneChecksumException { + return computeChecksum(data, false); + } + + public ChecksumData computeChecksum(ChunkBuffer data, boolean useCache) + throws OzoneChecksumException { if (checksumType == ChecksumType.NONE) { // Since type is set to NONE, we do not need to compute the checksums return new ChecksumData(checksumType, bytesPerChecksum); @@ -207,7 +231,7 @@ public ChecksumData computeChecksum(ChunkBuffer data) } final List checksumList; - if (checksumCache == null) { + if (checksumCache == null || !useCache) { // When checksumCache is not enabled: // Checksum is computed for each bytesPerChecksum number of bytes of data // starting at offset 0. The last checksum might be computed for the diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java index a1004ddc438b..5df9e74075d6 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java @@ -42,6 +42,8 @@ private Checksum getChecksum(ContainerProtos.ChecksumType type) { return new Checksum(type, BYTES_PER_CHECKSUM, true); } + // TODO: Parameterize all test cases: useChecksumCache = [true, false] + /** * Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 1bcb64200b22..f189876c7bb0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -560,6 +560,8 @@ ContainerCommandResponseProto handlePutBlock( endOfBlock = true; } + // Note: checksum held inside blockData. But no extra checksum validation here with handlePutBlock. + long bcsId = dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); blockData.setBlockCommitSequenceId(bcsId); @@ -888,6 +890,7 @@ ContainerCommandResponseProto handleWriteChunk( if (isWrite) { data = ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); + // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); } chunkManager From c0c82d84a50d6e91426036b4a96af5c162e32584 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 01:03:09 -0800 Subject: [PATCH 03/12] Parameterized existing test cases in TestChecksum. --- .../hadoop/ozone/common/TestChecksum.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java index 5df9e74075d6..7ddb605c0f8f 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java @@ -19,7 +19,10 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.ByteBuffer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -35,25 +38,25 @@ public class TestChecksum { private static final ContainerProtos.ChecksumType CHECKSUM_TYPE_DEFAULT = ContainerProtos.ChecksumType.SHA256; - private Checksum getChecksum(ContainerProtos.ChecksumType type) { + private Checksum getChecksum(ContainerProtos.ChecksumType type, boolean allowChecksumCache) { if (type == null) { type = CHECKSUM_TYPE_DEFAULT; } - return new Checksum(type, BYTES_PER_CHECKSUM, true); + return new Checksum(type, BYTES_PER_CHECKSUM, allowChecksumCache); } - // TODO: Parameterize all test cases: useChecksumCache = [true, false] - /** * Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}. */ - @Test - public void testVerifyChecksum() throws Exception { - Checksum checksum = getChecksum(null); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testVerifyChecksum(boolean useChecksumCache) throws Exception { + Checksum checksum = getChecksum(null, useChecksumCache); int dataLen = 55; byte[] data = RandomStringUtils.randomAlphabetic(dataLen).getBytes(UTF_8); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); - ChecksumData checksumData = checksum.computeChecksum(data); + ChecksumData checksumData = checksum.computeChecksum(byteBuffer, useChecksumCache); // A checksum is calculate for each bytesPerChecksum number of bytes in // the data. Since that value is 10 here and the data length is 55, we @@ -67,11 +70,13 @@ public void testVerifyChecksum() throws Exception { /** * Tests that if data is modified, then the checksums should not match. */ - @Test - public void testIncorrectChecksum() throws Exception { - Checksum checksum = getChecksum(null); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testIncorrectChecksum(boolean useChecksumCache) throws Exception { + Checksum checksum = getChecksum(null, useChecksumCache); byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes(UTF_8); - ChecksumData originalChecksumData = checksum.computeChecksum(data); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); + ChecksumData originalChecksumData = checksum.computeChecksum(byteBuffer, useChecksumCache); // Change the data and check if new checksum matches the original checksum. // Modifying one byte of data should be enough for the checksum data to @@ -85,13 +90,14 @@ public void testIncorrectChecksum() throws Exception { * Tests that checksum calculated using two different checksumTypes should * not match. */ - @Test - public void testChecksumMismatchForDifferentChecksumTypes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testChecksumMismatchForDifferentChecksumTypes(boolean useChecksumCache) { // Checksum1 of type SHA-256 - Checksum checksum1 = getChecksum(null); + Checksum checksum1 = getChecksum(null, useChecksumCache); // Checksum2 of type CRC32 - Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32); + Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32, useChecksumCache); // The two checksums should not match as they have different types assertNotEquals(checksum1, checksum2, "Checksums should not match for different checksum types"); From 40c753b3e801f12644788bf29a20721dc1cb3abd Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 01:30:56 -0800 Subject: [PATCH 04/12] Add config `ozone.client.chunk.checksum.cache.enabled`. --- .../hadoop/hdds/scm/OzoneClientConfig.java | 19 +++++++++++++++++-- .../hdds/scm/storage/BlockOutputStream.java | 10 +++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 8082e233a3a3..a0c62939d078 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -156,7 +156,7 @@ public enum ChecksumCombineMode { description = "Indicates the time duration in seconds a client will wait " + "before retrying a read key request on encountering " - + "a connectivity excepetion from Datanodes . " + + "a connectivity exception from Datanodes. " + "By default the interval is 1 second", tags = ConfigTag.CLIENT) private int readRetryInterval = 1; @@ -178,7 +178,14 @@ public enum ChecksumCombineMode { tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE }) private int bytesPerChecksum = 16 * 1024; - // TODO: Add block chunk checksum cache client option + // Client-side block chunk checksum cache config + // TODO: Re-evaluate if this need to be exposed to end-users at all, e.g. to avoid confusion. + @Config(key = "chunk.checksum.cache.enabled", + defaultValue = "true", // TODO: false by default? + description = "Increase client-side chunk checksum calculation efficiency in certain cases " + + "by caching previously computed checksums in the same block chunk.", + tags = ConfigTag.CLIENT) + private boolean chunkChecksumCacheEnabled = true; @Config(key = "verify.checksum", defaultValue = "true", @@ -451,6 +458,14 @@ public void setBytesPerChecksum(int bytesPerChecksum) { this.bytesPerChecksum = bytesPerChecksum; } + public boolean isChunkChecksumCacheEnabled() { + return chunkChecksumCacheEnabled; + } + + public void setChunkChecksumCacheEnabled(boolean chunkChecksumCacheEnabled) { + this.chunkChecksumCacheEnabled = chunkChecksumCacheEnabled; + } + public boolean isChecksumVerify() { return checksumVerify; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index c82de28b0612..7a74dbb0ee65 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -135,6 +135,7 @@ public class BlockOutputStream extends OutputStream { private List bufferList; private final List failedServers; + private final boolean chunkChecksumCacheEnabled; private final Checksum checksum; //number of buffers used before doing a flush/putBlock. @@ -231,8 +232,9 @@ public BlockOutputStream( writtenDataLength = 0; failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); - checksum = new Checksum(config.getChecksumType(), - config.getBytesPerChecksum()); + this.chunkChecksumCacheEnabled = config.isChunkChecksumCacheEnabled(); + this.checksum = new Checksum(config.getChecksumType(), + config.getBytesPerChecksum(), chunkChecksumCacheEnabled); this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking(); @@ -1066,6 +1068,8 @@ private void updateBlockDataWithLastChunkBuffer() addToBlockData(lastChunkInfo); lastChunkBuffer.clear(); + // Clear checksum cache associated with lastChunkBuffer? not the right place. TODO: REMOVE THIS +// checksum.clearChecksumCache(); if (lastChunkSize == config.getStreamBufferSize()) { lastChunkOffset += config.getStreamBufferSize(); } else { @@ -1132,7 +1136,7 @@ private ChunkInfo createChunkInfo(long lastPartialChunkOffset) int revisedChunkSize = lastChunkBuffer.remaining(); // create the chunk info to be sent in PutBlock. ChecksumData revisedChecksumData = - checksum.computeChecksum(lastChunkBuffer, true); + checksum.computeChecksum(lastChunkBuffer, chunkChecksumCacheEnabled); // Cache checksum here. This checksum is stored in blockData (and later transferred in PutBlock) long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); From 4c3ecd2dfa76ae91338ecb1ab1d57d04a6a1c783 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:32:09 -0800 Subject: [PATCH 05/12] Fix issue where ciEnd boundary is incorrect when currChunkLength is aligned with bytesPerChecksum. --- .../java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java | 4 ++-- .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 6 +++--- .../java/org/apache/hadoop/ozone/common/ChecksumCache.java | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index a0c62939d078..8f29c7578738 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -182,8 +182,8 @@ public enum ChecksumCombineMode { // TODO: Re-evaluate if this need to be exposed to end-users at all, e.g. to avoid confusion. @Config(key = "chunk.checksum.cache.enabled", defaultValue = "true", // TODO: false by default? - description = "Increase client-side chunk checksum calculation efficiency in certain cases " + - "by caching previously computed checksums in the same block chunk.", + description = "Increase client-side chunk checksum calculation efficiency when incremental chunk list is " + + "enabled by caching previously computed checksums in the same block chunk.", tags = ConfigTag.CLIENT) private boolean chunkChecksumCacheEnabled = true; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 7a74dbb0ee65..d3764901d41f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -1047,7 +1047,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); - checksum.clearChecksumCache(); // New chunk, clear the checksum cache +// checksum.clearChecksumCache(); // New chunk, clear the checksum cache // TODO: Can make the cache impl a bit more robust by associating ChecksumCache with ChunkBuffer/ByteBuffer rather // than the Checksum object. appendLastChunkBuffer(chunk, remainingBufferSize, @@ -1068,8 +1068,8 @@ private void updateBlockDataWithLastChunkBuffer() addToBlockData(lastChunkInfo); lastChunkBuffer.clear(); - // Clear checksum cache associated with lastChunkBuffer? not the right place. TODO: REMOVE THIS -// checksum.clearChecksumCache(); + // Clear checksum cache associated with lastChunkBuffer? TODO: Double check + checksum.clearChecksumCache(); if (lastChunkSize == config.getStreamBufferSize()) { lastChunkOffset += config.getStreamBufferSize(); } else { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java index d584a5243ccb..a1c077302c32 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java @@ -79,7 +79,7 @@ public List computeChecksum(ChunkBuffer data, Function computeChecksum(ChunkBuffer data, Function Date: Thu, 7 Nov 2024 13:40:09 -0800 Subject: [PATCH 06/12] Add TestChecksumCache. --- .../ozone/common/TestChecksumCache.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java new file mode 100644 index 000000000000..5ae6da6ea7ed --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.ozone.common.Checksum.Algorithm; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Function; + +/** + * Test class for {@link ChecksumCache}. + */ +class TestChecksumCache { + public static final Logger LOG = LoggerFactory.getLogger(TestChecksumCache.class); + + @ParameterizedTest + @EnumSource(ChecksumType.class) + void testComputeChecksum(ChecksumType checksumType) throws Exception { + final int bytesPerChecksum = 16; + ChecksumCache checksumCache = new ChecksumCache(bytesPerChecksum); + + final int size = 66; + byte[] byteArray = new byte[size]; + // Fill byteArray with bytes from 0 to 127 for deterministic testing + for (int i = 0; i < size; i++) { + byteArray[i] = (byte) (i % 128); + } + + final Function function = Algorithm.valueOf(checksumType).newChecksumFunction(); + + int i_end = size / bytesPerChecksum + (size % bytesPerChecksum == 0 ? 0 : 1); + List lastRes = null; + for (int i = 0; i < i_end; i++) { + int byteBufferLength = Integer.min(byteArray.length, bytesPerChecksum * (i + 1)); + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray, 0, byteBufferLength); + + try (ChunkBuffer chunkBuffer = ChunkBuffer.wrap(byteBuffer.asReadOnlyBuffer())) { + List res = checksumCache.computeChecksum(chunkBuffer, function); + System.out.println(res); + // Verify that every entry in the res list except the last one is the same as the lastRes list + if (i > 0) { + for (int j = 0; j < res.size() - 1; j++) { + Assertions.assertEquals(lastRes.get(j), res.get(j)); + } + } + lastRes = res; + } + } + + // Sanity check + checksumCache.clear(); + } +} From 565c604258235750cf76297be8a634fcb0d82f64 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:23:30 -0800 Subject: [PATCH 07/12] Fix `clearChecksumCache()` call location; handle `currChunkLength == prevChunkLength` case. --- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 12 +++++++----- .../apache/hadoop/ozone/common/ChecksumCache.java | 10 ++++++++-- .../hadoop/ozone/common/TestChecksumCache.java | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index d3764901d41f..8c641d0de73e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -1047,9 +1047,11 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); -// checksum.clearChecksumCache(); // New chunk, clear the checksum cache + + // New chunk, need to clear checksum cache + checksum.clearChecksumCache(); // TODO: Can make the cache impl a bit more robust by associating ChecksumCache with ChunkBuffer/ByteBuffer rather - // than the Checksum object. + // than the Checksum object? appendLastChunkBuffer(chunk, remainingBufferSize, chunk.remaining() - remainingBufferSize); } @@ -1066,10 +1068,10 @@ private void updateBlockDataWithLastChunkBuffer() LOG.debug("lastChunkInfo = {}", lastChunkInfo); long lastChunkSize = lastChunkInfo.getLen(); addToBlockData(lastChunkInfo); - + // This sets ByteBuffer limit to capacity, pos to 0. Does NOT erase data + // TODO: This could be put inside createChunkInfo() to be more clear. lastChunkBuffer.clear(); - // Clear checksum cache associated with lastChunkBuffer? TODO: Double check - checksum.clearChecksumCache(); + if (lastChunkSize == config.getStreamBufferSize()) { lastChunkOffset += config.getStreamBufferSize(); } else { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java index a1c077302c32..ee155c7f4b47 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java @@ -68,11 +68,17 @@ public List computeChecksum(ChunkBuffer data, Function res = checksumCache.computeChecksum(chunkBuffer, function); System.out.println(res); - // Verify that every entry in the res list except the last one is the same as the lastRes list + // Verify that every entry in the res list except the last one is the same as the one in lastRes list if (i > 0) { for (int j = 0; j < res.size() - 1; j++) { Assertions.assertEquals(lastRes.get(j), res.get(j)); From 82ee6c51c871213ef2f3ee083ecbc91ddf058b26 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:24:21 -0800 Subject: [PATCH 08/12] TestBlockOutputStreamIncrementalPutBlock: Add config `ozone.client.hbase.enhancements.allowed = true`. --- .../client/TestBlockOutputStreamIncrementalPutBlock.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java index 361dcb1fd0a2..0db67441fb57 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java @@ -73,8 +73,12 @@ private void init(boolean incrementalChunkList) throws IOException { ((InMemoryConfiguration) config).setBoolean( OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); + ((InMemoryConfiguration) config).setBoolean( + "ozone.client.hbase.enhancements.allowed", true); ((InMemoryConfiguration) config).setBoolean( OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + ((InMemoryConfiguration) config).setInt( + "ozone.client.bytes.per.checksum", 8192); RpcClient rpcClient = new RpcClient(config, null) { From 3002c136b8d793fa1cb07dc69bcfba83e817037c Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:26:32 -0800 Subject: [PATCH 09/12] checkstyle --- .../org/apache/hadoop/ozone/common/TestChecksumCache.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java index 8a8d6eb134a0..49e0b75127ad 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java @@ -50,9 +50,9 @@ void testComputeChecksum(ChecksumType checksumType) throws Exception { final Function function = Algorithm.valueOf(checksumType).newChecksumFunction(); - int i_end = size / bytesPerChecksum + (size % bytesPerChecksum == 0 ? 0 : 1); + int iEnd = size / bytesPerChecksum + (size % bytesPerChecksum == 0 ? 0 : 1); List lastRes = null; - for (int i = 0; i < i_end; i++) { + for (int i = 0; i < iEnd; i++) { int byteBufferLength = Integer.min(byteArray.length, bytesPerChecksum * (i + 1)); ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray, 0, byteBufferLength); From 13d1561e34293b3cd7344f178357c0270b990f4b Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Thu, 7 Nov 2024 19:01:13 -0800 Subject: [PATCH 10/12] Fix chunk-full condition. --- .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 8c641d0de73e..ba733008a3fe 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -1047,9 +1047,6 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); - - // New chunk, need to clear checksum cache - checksum.clearChecksumCache(); // TODO: Can make the cache impl a bit more robust by associating ChecksumCache with ChunkBuffer/ByteBuffer rather // than the Checksum object? appendLastChunkBuffer(chunk, remainingBufferSize, @@ -1074,6 +1071,8 @@ private void updateBlockDataWithLastChunkBuffer() if (lastChunkSize == config.getStreamBufferSize()) { lastChunkOffset += config.getStreamBufferSize(); + // Reached stream buffer size (chunk size), starting new chunk, need to clear checksum cache + checksum.clearChecksumCache(); } else { lastChunkBuffer.position((int) lastChunkSize); } From 856fcfee08cfd1091941b4a834051304569d1dad Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:19:52 -0800 Subject: [PATCH 11/12] Removed user-facing checksum cache config toggle, but kept internal constructor with cache off for testing --- .../hadoop/hdds/scm/OzoneClientConfig.java | 17 ----------------- .../hdds/scm/storage/BlockOutputStream.java | 12 +++++------- .../hadoop/ozone/common/ChecksumCache.java | 3 ++- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 8f29c7578738..a4b53a80a1e2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -178,15 +178,6 @@ public enum ChecksumCombineMode { tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE }) private int bytesPerChecksum = 16 * 1024; - // Client-side block chunk checksum cache config - // TODO: Re-evaluate if this need to be exposed to end-users at all, e.g. to avoid confusion. - @Config(key = "chunk.checksum.cache.enabled", - defaultValue = "true", // TODO: false by default? - description = "Increase client-side chunk checksum calculation efficiency when incremental chunk list is " + - "enabled by caching previously computed checksums in the same block chunk.", - tags = ConfigTag.CLIENT) - private boolean chunkChecksumCacheEnabled = true; - @Config(key = "verify.checksum", defaultValue = "true", description = "Ozone client to verify checksum of the checksum " @@ -458,14 +449,6 @@ public void setBytesPerChecksum(int bytesPerChecksum) { this.bytesPerChecksum = bytesPerChecksum; } - public boolean isChunkChecksumCacheEnabled() { - return chunkChecksumCacheEnabled; - } - - public void setChunkChecksumCacheEnabled(boolean chunkChecksumCacheEnabled) { - this.chunkChecksumCacheEnabled = chunkChecksumCacheEnabled; - } - public boolean isChecksumVerify() { return checksumVerify; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index ba733008a3fe..d560621da2ef 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -135,7 +135,6 @@ public class BlockOutputStream extends OutputStream { private List bufferList; private final List failedServers; - private final boolean chunkChecksumCacheEnabled; private final Checksum checksum; //number of buffers used before doing a flush/putBlock. @@ -232,9 +231,7 @@ public BlockOutputStream( writtenDataLength = 0; failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); - this.chunkChecksumCacheEnabled = config.isChunkChecksumCacheEnabled(); - this.checksum = new Checksum(config.getChecksumType(), - config.getBytesPerChecksum(), chunkChecksumCacheEnabled); + this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum(), true); this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking(); @@ -581,6 +578,7 @@ CompletableFuture executePutBlock(boolean close, final CompletableFuture flushFuture; final XceiverClientReply asyncReply; try { + // Note: checksum was previously appended to containerBlockData by WriteChunk BlockData blockData = containerBlockData.build(); LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos); @@ -1136,9 +1134,9 @@ private ChunkInfo createChunkInfo(long lastPartialChunkOffset) lastChunkBuffer.flip(); int revisedChunkSize = lastChunkBuffer.remaining(); // create the chunk info to be sent in PutBlock. - ChecksumData revisedChecksumData = - checksum.computeChecksum(lastChunkBuffer, chunkChecksumCacheEnabled); - // Cache checksum here. This checksum is stored in blockData (and later transferred in PutBlock) + // checksum cache is utilized for this computation + // this checksum is stored in blockData and later transferred in PutBlock + ChecksumData revisedChecksumData = checksum.computeChecksum(lastChunkBuffer, true); long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java index ee155c7f4b47..0f6482919a3f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java @@ -46,6 +46,7 @@ public class ChecksumCache { private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB public ChecksumCache(int bytesPerChecksum) { + LOG.info("Initializing ChecksumCache with bytesPerChecksum = {}", bytesPerChecksum); this.prevChunkLength = 0; this.bytesPerChecksum = bytesPerChecksum; // Set initialCapacity to avoid costly resizes @@ -93,7 +94,7 @@ public List computeChecksum(ChunkBuffer data, Function Date: Mon, 25 Nov 2024 10:29:18 -0800 Subject: [PATCH 12/12] Clean up TODOs. --- .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 6 ++---- .../main/java/org/apache/hadoop/ozone/common/Checksum.java | 3 ++- .../src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java | 3 ++- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index d560621da2ef..6f9c55565059 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -1045,8 +1045,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); - // TODO: Can make the cache impl a bit more robust by associating ChecksumCache with ChunkBuffer/ByteBuffer rather - // than the Checksum object? + // TODO: Optional refactoring: Can attach ChecksumCache to lastChunkBuffer rather than Checksum appendLastChunkBuffer(chunk, remainingBufferSize, chunk.remaining() - remainingBufferSize); } @@ -1063,8 +1062,7 @@ private void updateBlockDataWithLastChunkBuffer() LOG.debug("lastChunkInfo = {}", lastChunkInfo); long lastChunkSize = lastChunkInfo.getLen(); addToBlockData(lastChunkInfo); - // This sets ByteBuffer limit to capacity, pos to 0. Does NOT erase data - // TODO: This could be put inside createChunkInfo() to be more clear. + // Set ByteBuffer limit to capacity, pos to 0. Does not erase data lastChunkBuffer.clear(); if (lastChunkSize == config.getStreamBufferSize()) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index 6639bf933b38..03771915be46 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -102,7 +102,8 @@ Function newChecksumFunction() { private final ChecksumType checksumType; private final int bytesPerChecksum; /** - * TODO: Make sure to clear this cache when a new block chunk is started. + * Caches computeChecksum() result when requested. + * This must be manually cleared when a new block chunk has been started. */ private final ChecksumCache checksumCache; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index c39e24571a8c..7e0f93e8e350 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -84,6 +84,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.ChecksumCache; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.impl.AbstractTestChunkManager; @@ -225,8 +226,8 @@ public static void init() throws Exception { GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(ChecksumCache.LOG, Level.DEBUG); openKeyCleanupService = (OpenKeyCleanupService) cluster.getOzoneManager().getKeyManager().getOpenKeyCleanupService();