diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 5426bbc49817..a4b53a80a1e2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -156,7 +156,7 @@ public enum ChecksumCombineMode { description = "Indicates the time duration in seconds a client will wait " + "before retrying a read key request on encountering " - + "a connectivity excepetion from Datanodes . " + + "a connectivity exception from Datanodes. " + "By default the interval is 1 second", tags = ConfigTag.CLIENT) private int readRetryInterval = 1; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 59795dd0f051..6f9c55565059 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -231,8 +231,7 @@ public BlockOutputStream( writtenDataLength = 0; failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); - checksum = new Checksum(config.getChecksumType(), - config.getBytesPerChecksum()); + this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum(), true); this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking(); @@ -579,6 +578,7 @@ CompletableFuture executePutBlock(boolean close, final CompletableFuture flushFuture; final XceiverClientReply asyncReply; try { + // Note: checksum was previously appended to containerBlockData by WriteChunk BlockData blockData = containerBlockData.build(); LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos); @@ -841,6 +841,8 @@ public synchronized void cleanup(boolean invalidateClient) { if (lastChunkBuffer != null) { DIRECT_BUFFER_POOL.returnBuffer(lastChunkBuffer); lastChunkBuffer = null; + // Clear checksum cache + checksum.clearChecksumCache(); } } @@ -890,7 +892,10 @@ private CompletableFuture writeChunkToContainer( final long offset = chunkOffset.getAndAdd(effectiveChunkSize); final ByteString data = chunk.toByteString( bufferPool.byteStringConversion()); - ChecksumData checksumData = checksum.computeChecksum(chunk); + // chunk is incremental, don't cache its checksum + ChecksumData checksumData = checksum.computeChecksum(chunk, false); + // side note: checksum object is shared with PutBlock's (blockData) checksum calc, + // current impl does not support caching both ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex) .setOffset(offset) @@ -1040,6 +1045,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk) lastChunkBuffer.capacity() - lastChunkBuffer.position(); appendLastChunkBuffer(chunk, 0, remainingBufferSize); updateBlockDataWithLastChunkBuffer(); + // TODO: Optional refactoring: Can attach ChecksumCache to lastChunkBuffer rather than Checksum appendLastChunkBuffer(chunk, remainingBufferSize, chunk.remaining() - remainingBufferSize); } @@ -1056,10 +1062,13 @@ private void updateBlockDataWithLastChunkBuffer() LOG.debug("lastChunkInfo = {}", lastChunkInfo); long lastChunkSize = lastChunkInfo.getLen(); addToBlockData(lastChunkInfo); - + // Set ByteBuffer limit to capacity, pos to 0. Does not erase data lastChunkBuffer.clear(); + if (lastChunkSize == config.getStreamBufferSize()) { lastChunkOffset += config.getStreamBufferSize(); + // Reached stream buffer size (chunk size), starting new chunk, need to clear checksum cache + checksum.clearChecksumCache(); } else { lastChunkBuffer.position((int) lastChunkSize); } @@ -1123,8 +1132,9 @@ private ChunkInfo createChunkInfo(long lastPartialChunkOffset) lastChunkBuffer.flip(); int revisedChunkSize = lastChunkBuffer.remaining(); // create the chunk info to be sent in PutBlock. - ChecksumData revisedChecksumData = - checksum.computeChecksum(lastChunkBuffer); + // checksum cache is utilized for this computation + // this checksum is stored in blockData and later transferred in PutBlock + ChecksumData revisedChecksumData = checksum.computeChecksum(lastChunkBuffer, true); long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index f8b3febfeca8..03771915be46 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -33,6 +33,8 @@ import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to compute and verify checksums for chunks. @@ -40,6 +42,8 @@ * This class is not thread safe. */ public class Checksum { + public static final Logger LOG = LoggerFactory.getLogger(Checksum.class); + private static Function newMessageDigestFunction( String algorithm) { final MessageDigest md; @@ -63,7 +67,7 @@ public static ByteString int2ByteString(int n) { private static Function newChecksumByteBufferFunction( Supplier constructor) { final ChecksumByteBuffer algorithm = constructor.get(); - return data -> { + return data -> { algorithm.reset(); algorithm.update(data); return int2ByteString((int)algorithm.getValue()); @@ -97,6 +101,23 @@ Function newChecksumFunction() { private final ChecksumType checksumType; private final int bytesPerChecksum; + /** + * Caches computeChecksum() result when requested. + * This must be manually cleared when a new block chunk has been started. + */ + private final ChecksumCache checksumCache; + + /** + * BlockOutputStream needs to call this method to clear the checksum cache + * whenever a block chunk has been established. + */ + public boolean clearChecksumCache() { + if (checksumCache != null) { + checksumCache.clear(); + return true; + } + return false; + } /** * Constructs a Checksum object. @@ -106,6 +127,24 @@ Function newChecksumFunction() { public Checksum(ChecksumType type, int bytesPerChecksum) { this.checksumType = type; this.bytesPerChecksum = bytesPerChecksum; + this.checksumCache = null; + } + + /** + * Constructs a Checksum object. + * @param type type of Checksum + * @param bytesPerChecksum number of bytes of data per checksum + * @param allowChecksumCache true to enable checksum cache + */ + public Checksum(ChecksumType type, int bytesPerChecksum, boolean allowChecksumCache) { + this.checksumType = type; + this.bytesPerChecksum = bytesPerChecksum; + LOG.debug("allowChecksumCache = {}", allowChecksumCache); + if (allowChecksumCache) { + this.checksumCache = new ChecksumCache(bytesPerChecksum); + } else { + this.checksumCache = null; + } } /** @@ -128,13 +167,25 @@ public ChecksumData computeChecksum(byte[] data) return computeChecksum(ByteBuffer.wrap(data)); } + /** + * The default implementation of computeChecksum(ByteBuffer) that does not use cache, even if cache is initialized. + * This is a stop-gap solution before the protocol change. + * @param data ByteBuffer + * @return ChecksumData + * @throws OzoneChecksumException + */ + public ChecksumData computeChecksum(ByteBuffer data) + throws OzoneChecksumException { + return computeChecksum(data, false); + } + /** * Computes checksum for give data. * @param data input data. * @return ChecksumData computed for input data. * @throws OzoneChecksumException thrown when ChecksumType is not recognized */ - public ChecksumData computeChecksum(ByteBuffer data) + public ChecksumData computeChecksum(ByteBuffer data, boolean useChecksumCache) throws OzoneChecksumException { // If type is set to NONE, we do not need to compute the checksums. We also // need to avoid unnecessary conversions. @@ -144,7 +195,7 @@ public ChecksumData computeChecksum(ByteBuffer data) if (!data.isReadOnly()) { data = data.asReadOnlyBuffer(); } - return computeChecksum(ChunkBuffer.wrap(data)); + return computeChecksum(ChunkBuffer.wrap(data), useChecksumCache); } public ChecksumData computeChecksum(List byteStrings) @@ -154,8 +205,20 @@ public ChecksumData computeChecksum(List byteStrings) return computeChecksum(ChunkBuffer.wrap(buffers)); } + /** + * The default implementation of computeChecksum(ChunkBuffer) that does not use cache, even if cache is initialized. + * This is a stop-gap solution before the protocol change. + * @param data ChunkBuffer + * @return ChecksumData + * @throws OzoneChecksumException + */ public ChecksumData computeChecksum(ChunkBuffer data) throws OzoneChecksumException { + return computeChecksum(data, false); + } + + public ChecksumData computeChecksum(ChunkBuffer data, boolean useCache) + throws OzoneChecksumException { if (checksumType == ChecksumType.NONE) { // Since type is set to NONE, we do not need to compute the checksums return new ChecksumData(checksumType, bytesPerChecksum); @@ -168,12 +231,20 @@ public ChecksumData computeChecksum(ChunkBuffer data) throw new OzoneChecksumException(checksumType); } - // Checksum is computed for each bytesPerChecksum number of bytes of data - // starting at offset 0. The last checksum might be computed for the - // remaining data with length less than bytesPerChecksum. - final List checksumList = new ArrayList<>(); - for (ByteBuffer b : data.iterate(bytesPerChecksum)) { - checksumList.add(computeChecksum(b, function, bytesPerChecksum)); + final List checksumList; + if (checksumCache == null || !useCache) { + // When checksumCache is not enabled: + // Checksum is computed for each bytesPerChecksum number of bytes of data + // starting at offset 0. The last checksum might be computed for the + // remaining data with length less than bytesPerChecksum. + checksumList = new ArrayList<>(); + for (ByteBuffer b : data.iterate(bytesPerChecksum)) { + checksumList.add(computeChecksum(b, function, bytesPerChecksum)); // merge this? + } + } else { + // When checksumCache is enabled: + // We only need to update the last checksum in the cache, then pass it along. + checksumList = checksumCache.computeChecksum(data, function); } return new ChecksumData(checksumType, bytesPerChecksum, checksumList); } @@ -185,7 +256,7 @@ public ChecksumData computeChecksum(ChunkBuffer data) * @param maxLength the max length of data * @return computed checksum ByteString */ - private static ByteString computeChecksum(ByteBuffer data, + protected static ByteString computeChecksum(ByteBuffer data, Function function, int maxLength) { final int limit = data.limit(); try { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java new file mode 100644 index 000000000000..0f6482919a3f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumCache.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** + * Cache previous checksums to avoid recomputing them. + * This is a stop-gap solution to reduce checksum calc overhead inside critical section + * without having to do a major refactoring/overhaul over protobuf and interfaces. + * This is only supposed to be used by BlockOutputStream, for now. + *

+ * Each BlockOutputStream has its own Checksum instance. + * Each block chunk (4 MB default) is divided into 16 KB (default) each for checksum calculation. + * For CRC32/CRC32C, each checksum takes 4 bytes. Thus each block chunk has 4 MB / 16 KB * 4 B = 1 KB of checksum data. + */ +public class ChecksumCache { + public static final Logger LOG = LoggerFactory.getLogger(ChecksumCache.class); + + private final int bytesPerChecksum; + private final List checksums; + // Chunk length last time the checksum is computed + private int prevChunkLength; + // This only serves as a hint for array list initial allocation. The array list will still grow as needed. + private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB + + public ChecksumCache(int bytesPerChecksum) { + LOG.info("Initializing ChecksumCache with bytesPerChecksum = {}", bytesPerChecksum); + this.prevChunkLength = 0; + this.bytesPerChecksum = bytesPerChecksum; + // Set initialCapacity to avoid costly resizes + this.checksums = new ArrayList<>(BLOCK_CHUNK_SIZE / bytesPerChecksum); + } + + /** + * Clear cached checksums. And reset the written index. + */ + public void clear() { + prevChunkLength = 0; + checksums.clear(); + } + + public List getChecksums() { + return checksums; + } + + public List computeChecksum(ChunkBuffer data, Function function) { + // Indicates how much data the current chunk buffer holds + final int currChunkLength = data.limit(); + + if (currChunkLength == prevChunkLength) { + LOG.debug("ChunkBuffer data limit same as last time ({}). No new checksums need to be computed", prevChunkLength); + return checksums; + } + + // Sanity check + if (currChunkLength < prevChunkLength) { + // If currChunkLength <= lastChunkLength, it indicates a bug that needs to be addressed. + // It means BOS has not properly clear()ed the cache when a new chunk is started in that code path. + throw new IllegalArgumentException("ChunkBuffer data limit (" + currChunkLength + ")" + + " must not be smaller than last time (" + prevChunkLength + ")"); + } + + // One or more checksums need to be computed + + // Start of the checksum index that need to be (re)computed + final int ciStart = prevChunkLength / bytesPerChecksum; + final int ciEnd = currChunkLength / bytesPerChecksum + (currChunkLength % bytesPerChecksum == 0 ? 0 : 1); + int i = 0; + for (ByteBuffer b : data.iterate(bytesPerChecksum)) { + if (i < ciStart) { + i++; + continue; + } + + // variable i can either point to: + // 1. the last element in the list -- in which case the checksum needs to be updated + // 2. one after the last element -- in which case a new checksum needs to be added + assert i == checksums.size() - 1 || i == checksums.size(); + + // TODO: Furthermore for CRC32/CRC32C, it can be even more efficient by updating the last checksum byte-by-byte. + final ByteString checksum = Checksum.computeChecksum(b, function, bytesPerChecksum); + if (i == checksums.size()) { + checksums.add(checksum); + } else { + checksums.set(i, checksum); + } + + i++; + } + + // Sanity check + if (i != ciEnd) { + throw new IllegalStateException("ChecksumCache: Checksum index end does not match expectation"); + } + + // Update last written index + prevChunkLength = currChunkLength; + return checksums; + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java index 829f4bb150c9..7ddb605c0f8f 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java @@ -19,7 +19,10 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.ByteBuffer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -35,23 +38,25 @@ public class TestChecksum { private static final ContainerProtos.ChecksumType CHECKSUM_TYPE_DEFAULT = ContainerProtos.ChecksumType.SHA256; - private Checksum getChecksum(ContainerProtos.ChecksumType type) { + private Checksum getChecksum(ContainerProtos.ChecksumType type, boolean allowChecksumCache) { if (type == null) { type = CHECKSUM_TYPE_DEFAULT; } - return new Checksum(type, BYTES_PER_CHECKSUM); + return new Checksum(type, BYTES_PER_CHECKSUM, allowChecksumCache); } /** * Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}. */ - @Test - public void testVerifyChecksum() throws Exception { - Checksum checksum = getChecksum(null); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testVerifyChecksum(boolean useChecksumCache) throws Exception { + Checksum checksum = getChecksum(null, useChecksumCache); int dataLen = 55; byte[] data = RandomStringUtils.randomAlphabetic(dataLen).getBytes(UTF_8); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); - ChecksumData checksumData = checksum.computeChecksum(data); + ChecksumData checksumData = checksum.computeChecksum(byteBuffer, useChecksumCache); // A checksum is calculate for each bytesPerChecksum number of bytes in // the data. Since that value is 10 here and the data length is 55, we @@ -65,11 +70,13 @@ public void testVerifyChecksum() throws Exception { /** * Tests that if data is modified, then the checksums should not match. */ - @Test - public void testIncorrectChecksum() throws Exception { - Checksum checksum = getChecksum(null); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testIncorrectChecksum(boolean useChecksumCache) throws Exception { + Checksum checksum = getChecksum(null, useChecksumCache); byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes(UTF_8); - ChecksumData originalChecksumData = checksum.computeChecksum(data); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); + ChecksumData originalChecksumData = checksum.computeChecksum(byteBuffer, useChecksumCache); // Change the data and check if new checksum matches the original checksum. // Modifying one byte of data should be enough for the checksum data to @@ -83,13 +90,14 @@ public void testIncorrectChecksum() throws Exception { * Tests that checksum calculated using two different checksumTypes should * not match. */ - @Test - public void testChecksumMismatchForDifferentChecksumTypes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testChecksumMismatchForDifferentChecksumTypes(boolean useChecksumCache) { // Checksum1 of type SHA-256 - Checksum checksum1 = getChecksum(null); + Checksum checksum1 = getChecksum(null, useChecksumCache); // Checksum2 of type CRC32 - Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32); + Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32, useChecksumCache); // The two checksums should not match as they have different types assertNotEquals(checksum1, checksum2, "Checksums should not match for different checksum types"); diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java new file mode 100644 index 000000000000..49e0b75127ad --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumCache.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.ozone.common.Checksum.Algorithm; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Function; + +/** + * Test class for {@link ChecksumCache}. + */ +class TestChecksumCache { + public static final Logger LOG = LoggerFactory.getLogger(TestChecksumCache.class); + + @ParameterizedTest + @EnumSource(ChecksumType.class) + void testComputeChecksum(ChecksumType checksumType) throws Exception { + final int bytesPerChecksum = 16; + ChecksumCache checksumCache = new ChecksumCache(bytesPerChecksum); + + final int size = 66; + byte[] byteArray = new byte[size]; + // Fill byteArray with bytes from 0 to 127 for deterministic testing + for (int i = 0; i < size; i++) { + byteArray[i] = (byte) (i % 128); + } + + final Function function = Algorithm.valueOf(checksumType).newChecksumFunction(); + + int iEnd = size / bytesPerChecksum + (size % bytesPerChecksum == 0 ? 0 : 1); + List lastRes = null; + for (int i = 0; i < iEnd; i++) { + int byteBufferLength = Integer.min(byteArray.length, bytesPerChecksum * (i + 1)); + ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray, 0, byteBufferLength); + + try (ChunkBuffer chunkBuffer = ChunkBuffer.wrap(byteBuffer.asReadOnlyBuffer())) { + List res = checksumCache.computeChecksum(chunkBuffer, function); + System.out.println(res); + // Verify that every entry in the res list except the last one is the same as the one in lastRes list + if (i > 0) { + for (int j = 0; j < res.size() - 1; j++) { + Assertions.assertEquals(lastRes.get(j), res.get(j)); + } + } + lastRes = res; + } + } + + // Sanity check + checksumCache.clear(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 1bcb64200b22..f189876c7bb0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -560,6 +560,8 @@ ContainerCommandResponseProto handlePutBlock( endOfBlock = true; } + // Note: checksum held inside blockData. But no extra checksum validation here with handlePutBlock. + long bcsId = dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); blockData.setBlockCommitSequenceId(bcsId); @@ -888,6 +890,7 @@ ContainerCommandResponseProto handleWriteChunk( if (isWrite) { data = ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); + // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); } chunkManager diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java index 361dcb1fd0a2..0db67441fb57 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java @@ -73,8 +73,12 @@ private void init(boolean incrementalChunkList) throws IOException { ((InMemoryConfiguration) config).setBoolean( OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); + ((InMemoryConfiguration) config).setBoolean( + "ozone.client.hbase.enhancements.allowed", true); ((InMemoryConfiguration) config).setBoolean( OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + ((InMemoryConfiguration) config).setInt( + "ozone.client.bytes.per.checksum", 8192); RpcClient rpcClient = new RpcClient(config, null) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index c39e24571a8c..7e0f93e8e350 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -84,6 +84,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.ChecksumCache; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.impl.AbstractTestChunkManager; @@ -225,8 +226,8 @@ public static void init() throws Exception { GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(ChecksumCache.LOG, Level.DEBUG); openKeyCleanupService = (OpenKeyCleanupService) cluster.getOzoneManager().getKeyManager().getOpenKeyCleanupService();