Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,28 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
}

BlockData checksumBlockData = null;
BlockID blockID = null;
//Reverse Traversal as all parity will have checksumBytes
for (int i = blockData.length - 1; i >= 0; i--) {
BlockData bd = blockData[i];
if (bd == null) {
continue;
}
if (blockID == null) {
// store the BlockID for logging
blockID = bd.getBlockID();
}
List<ChunkInfo> chunks = bd.getChunks();
if (chunks != null && chunks.size() > 0 && chunks.get(0)
.hasStripeChecksum()) {
checksumBlockData = bd;
break;
if (chunks != null && chunks.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic had:

if (chunks != null && chunks.size() > 0 && chunks.get(0)
          .hasStripeChecksum()) {

So that checksumBlockData is only set if hasStripeChecksum()) returns true. With the change you have made, checksumBlockData will be set if hasChecksumData(), but it might not have stripeChecksum in it.

Then at line 155 it will enter the IF block and at line 174 I am not sure what will happen if stripeCheckSum is missing.

All the IF statement at 155 does is copy in the stripeChecksum if it exists. So if it does not exist, there is no point in going into that IF at all, as we will just be copying the original chunkChecksum out and back in again.

Following on from this - we only set checksumBlockData if there is a stripeChecksum, then you can also remove the foundStripeChecksum boolean as checksumBlockData != null means the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so what you're saying is that there's no point in going inside the if block at line 155 if stripeChecksum isn't found because that code is only setting stripeChecksum. So overall, the only change we need here is to log instead of throw at line 184. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's it.

if (chunks.get(0).hasStripeChecksum()) {
checksumBlockData = bd;
break;
} else {
ChunkInfo chunk = chunks.get(0);
LOG.debug("The first chunk in block with index {} does not have stripeChecksum. BlockID: {}, Block " +
"size: {}. Chunk length: {}, Chunk offset: {}, hasChecksumData: {}, chunks size: {}.", i,
bd.getBlockID(), bd.getSize(), chunk.getLen(), chunk.getOffset(), chunk.hasChecksumData(), chunks.size());
}
}
}

Expand Down Expand Up @@ -158,9 +169,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
getContainerBlockData().clearChunks();
getContainerBlockData().addAllChunks(newChunkList);
} else {
throw new IOException("None of the block data have checksum " +
"which means " + parity + "(parity)+1 blocks are " +
"not present");
LOG.warn("Could not find checksum data in any index for blockData with BlockID {}, length {} and " +
"blockGroupLength {}.", blockID, blockData.length, blockGroupLength);
}

return executePutBlock(close, force, blockGroupLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand All @@ -42,8 +46,13 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand Down Expand Up @@ -83,6 +92,66 @@ void test(final int writeSize) throws IOException {
}
}

/**
* Tests an EC offline reconstruction scenario in which none of the ChunkInfo in an EC stripe have stripeChecksum.
* Such ChunkInfo will exist for any EC data that was written in a version in which the ChunkInfo protobuf message did
* not have the stripeChecksum field. Here, we assert that executePutBlock during reconstruction does not throw an
* exception because of missing stripeChecksum. This essentially tests compatibility between an Ozone version that
* did not have stripeChecksum and a version that has stripeChecksum.
*/
@Test
public void testMissingStripeChecksumDoesNotMakeExecutePutBlockFailDuringECReconstruction() throws IOException {
// setup some parameters required for creating ECBlockOutputStream
OzoneClientConfig config = new OzoneClientConfig();
ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2);
BlockID blockID = new BlockID(1, 1);
DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails();
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.valueOf(datanodeDetails.getUuid()))
.setReplicationConfig(replicationConfig)
.setNodes(ImmutableList.of(datanodeDetails))
.setState(Pipeline.PipelineState.CLOSED)
// we'll executePutBlock for the parity index 5 because stripeChecksum is written to either the first or the
// parity indexes
.setReplicaIndexes(ImmutableMap.of(datanodeDetails, 5)).build();

BlockLocationInfo locationInfo = new BlockLocationInfo.Builder()
.setBlockID(blockID)
.setOffset(1)
.setLength(10)
.setPipeline(pipeline).build();

/*
The array of BlockData contains metadata about blocks and their chunks, and is read in executePutBlock. In
this test, we deliberately don't write stripeChecksum to any chunk. The expectation is that executePutBlock
should not throw an exception because of missing stripeChecksum.
*/
BlockData[] blockData = createBlockDataWithoutStripeChecksum(blockID, replicationConfig);
try (ECBlockOutputStream ecBlockOutputStream = createECBlockOutputStream(config, replicationConfig, blockID,
pipeline)) {
Assertions.assertDoesNotThrow(() -> ecBlockOutputStream.executePutBlock(true, true, locationInfo.getLength(),
blockData));
}
}

/**
* Creates a BlockData array with {@link ECReplicationConfig#getRequiredNodes()} number of elements.
*/
private BlockData[] createBlockDataWithoutStripeChecksum(BlockID blockID, ECReplicationConfig replicationConfig) {
int requiredNodes = replicationConfig.getRequiredNodes();
BlockData[] blockDataArray = new BlockData[requiredNodes];

// add just one ChunkInfo to each BlockData.
for (int i = 0; i < requiredNodes; i++) {
BlockData data = new BlockData(blockID);
// create a ChunkInfo with no stripeChecksum
ChunkInfo chunkInfo = new ChunkInfo("abc", 0, 10);
data.addChunk(chunkInfo.getProtoBufMessage());
blockDataArray[i] = data;
}
return blockDataArray;
}

private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
throws IOException {

Expand Down Expand Up @@ -114,6 +183,20 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
() -> newFixedThreadPool(10));
}

private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientConfig,
ECReplicationConfig repConfig, BlockID blockID, Pipeline pipeline) throws IOException {
final XceiverClientManager xcm = mock(XceiverClientManager.class);
when(xcm.acquireClient(any()))
.thenReturn(new MockXceiverClientSpi(pipeline));

ContainerClientMetrics clientMetrics = ContainerClientMetrics.acquire();
StreamBufferArgs streamBufferArgs =
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, clientConfig);

return new ECBlockOutputStream(blockID, xcm, pipeline, BufferPool.empty(), clientConfig, null,
clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2));
}

/**
* XCeiverClient which simulates responses.
*/
Expand Down