Skip to content

Commit 4136d47

Browse files
HDDS-4552. Read data from chunk into ByteBuffer[] instead of single ByteBuffer. (#1685)
1 parent 79877fa commit 4136d47

File tree

45 files changed

+1294
-402
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1294
-402
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class OzoneClientConfig {
113113
type = ConfigType.SIZE,
114114
description = "Checksum will be computed for every bytes per checksum "
115115
+ "number of bytes and stored sequentially. The minimum value for "
116-
+ "this config is 256KB.",
116+
+ "this config is 16KB.",
117117
tags = ConfigTag.CLIENT)
118118
private int bytesPerChecksum = 1024 * 1024;
119119

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,4 +502,9 @@ private void handleReadError(IOException cause) throws IOException {
502502

503503
refreshPipeline(cause);
504504
}
505+
506+
@VisibleForTesting
507+
public synchronized List<ChunkInputStream> getChunkStreams() {
508+
return chunkStreams;
509+
}
505510
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java

Lines changed: 150 additions & 55 deletions
Large diffs are not rendered by default.

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdds.scm.storage;
1919

20+
import java.nio.ByteBuffer;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223

@@ -25,6 +26,7 @@
2526
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
2627

2728
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
29+
import org.apache.hadoop.ozone.common.utils.BufferUtils;
2830
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
2931

3032
/**
@@ -48,12 +50,29 @@ public DummyChunkInputStream(ChunkInfo chunkInfo,
4850
}
4951

5052
@Override
51-
protected ByteString readChunk(ChunkInfo readChunkInfo) {
52-
ByteString byteString = ByteString.copyFrom(chunkData,
53-
(int) readChunkInfo.getOffset(),
54-
(int) readChunkInfo.getLen());
55-
getReadByteBuffers().add(byteString);
56-
return byteString;
53+
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
54+
int offset = (int) readChunkInfo.getOffset();
55+
int remainingToRead = (int) readChunkInfo.getLen();
56+
57+
int bufferCapacity = readChunkInfo.getChecksumData().getBytesPerChecksum();
58+
int bufferLen;
59+
readByteBuffers.clear();
60+
while (remainingToRead > 0) {
61+
if (remainingToRead < bufferCapacity) {
62+
bufferLen = remainingToRead;
63+
} else {
64+
bufferLen = bufferCapacity;
65+
}
66+
ByteString byteString = ByteString.copyFrom(chunkData,
67+
offset, bufferLen);
68+
69+
readByteBuffers.add(byteString);
70+
71+
offset += bufferLen;
72+
remainingToRead -= bufferLen;
73+
}
74+
75+
return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
5776
}
5877

5978
@Override

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.hadoop.hdds.scm.storage;
2020

2121
import java.io.EOFException;
22+
import java.nio.ByteBuffer;
23+
import java.util.List;
2224
import java.util.Random;
2325
import java.util.concurrent.atomic.AtomicReference;
2426

@@ -94,6 +96,19 @@ private void matchWithInputData(byte[] readData, int inputDataStartIndex,
9496
}
9597
}
9698

99+
private void matchWithInputData(List<ByteString> byteStrings,
100+
int inputDataStartIndex, int length) {
101+
int offset = inputDataStartIndex;
102+
int totalBufferLen = 0;
103+
for (ByteString byteString : byteStrings) {
104+
int bufferLen = byteString.size();
105+
matchWithInputData(byteString.toByteArray(), offset, bufferLen);
106+
offset += bufferLen;
107+
totalBufferLen += bufferLen;
108+
}
109+
Assert.assertEquals(length, totalBufferLen);
110+
}
111+
97112
/**
98113
* Seek to a position and verify through getPos().
99114
*/
@@ -123,10 +138,9 @@ public void testPartialChunkRead() throws Exception {
123138
// To read chunk data from index 0 to 49 (len = 50), we need to read
124139
// chunk from offset 0 to 60 as the checksum boundary is at every 20
125140
// bytes. Verify that 60 bytes of chunk data are read and stored in the
126-
// buffers.
127-
matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
128-
0, 60);
129-
141+
// buffers. Since checksum boundary is at every 20 bytes, there should be
142+
// 60/20 number of buffers.
143+
matchWithInputData(chunkStream.getReadByteBuffers(), 0, 60);
130144
}
131145

132146
@Test
@@ -152,8 +166,7 @@ public void testSeek() throws Exception {
152166
byte[] b = new byte[30];
153167
chunkStream.read(b, 0, 30);
154168
matchWithInputData(b, 25, 30);
155-
matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
156-
20, 40);
169+
matchWithInputData(chunkStream.getReadByteBuffers(), 20, 40);
157170

158171
// After read, the position of the chunkStream is evaluated from the
159172
// buffers and the chunkPosition should be reset to -1.
@@ -216,8 +229,8 @@ public void connectsToNewPipeline() throws Exception {
216229
ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
217230
clientFactory, pipelineRef::get, false, null) {
218231
@Override
219-
protected ByteString readChunk(ChunkInfo readChunkInfo) {
220-
return ByteString.copyFrom(chunkData);
232+
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
233+
return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
221234
}
222235
};
223236

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public final class ScmConfigKeys {
133133
// 4 MB by default
134134
public static final String OZONE_SCM_CHUNK_SIZE_DEFAULT = "4MB";
135135

136+
public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY =
137+
"ozone.chunk.read.buffer.default.size";
138+
public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT =
139+
"64KB";
140+
136141
public static final String OZONE_SCM_CHUNK_LAYOUT_KEY =
137142
"ozone.scm.chunk.layout";
138143

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
package org.apache.hadoop.hdds.scm.protocolPB;
1919

2020
import com.google.common.base.Preconditions;
21+
import java.nio.ByteBuffer;
22+
import java.util.List;
23+
import java.util.function.Function;
24+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2125
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
2226
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
2327
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
2428
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
2529
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto.Builder;
2630
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
31+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DataBuffers;
2732
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
2833
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
2934
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
@@ -34,8 +39,11 @@
3439
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
3540
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
3641
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
42+
import org.apache.hadoop.ozone.common.ChunkBuffer;
3743
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
3844

45+
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
46+
3947
/**
4048
* A set of helper functions to create responses to container commands.
4149
*/
@@ -204,26 +212,46 @@ public static ContainerCommandResponseProto getPutFileResponseSuccess(
204212

205213
/**
206214
* Gets a response to the read small file call.
207-
* @param msg - Msg
208-
* @param data - Data
215+
* @param request - Msg
216+
* @param dataBuffers - Data
209217
* @param info - Info
210218
* @return Response.
211219
*/
212220
public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
213-
ContainerCommandRequestProto msg, ByteString data, ChunkInfo info) {
214-
215-
Preconditions.checkNotNull(msg);
216-
217-
ReadChunkResponseProto.Builder readChunk =
218-
ReadChunkResponseProto.newBuilder()
219-
.setChunkData(info)
220-
.setData((data))
221-
.setBlockID(msg.getGetSmallFile().getBlock().getBlockID());
221+
ContainerCommandRequestProto request, List<ByteString> dataBuffers,
222+
ChunkInfo info) {
223+
224+
Preconditions.checkNotNull(request);
225+
226+
boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile())
227+
.equals(ContainerProtos.ReadChunkVersion.V0);
228+
229+
ReadChunkResponseProto.Builder readChunk;
230+
231+
if (isReadChunkV0) {
232+
// V0 has all response data in a single ByteBuffer
233+
ByteString combinedData = ByteString.EMPTY;
234+
for (ByteString buffer : dataBuffers) {
235+
combinedData.concat(buffer);
236+
}
237+
readChunk = ReadChunkResponseProto.newBuilder()
238+
.setChunkData(info)
239+
.setData(combinedData)
240+
.setBlockID(request.getGetSmallFile().getBlock().getBlockID());
241+
} else {
242+
// V1 splits response data into a list of ByteBuffers
243+
readChunk = ReadChunkResponseProto.newBuilder()
244+
.setChunkData(info)
245+
.setDataBuffers(DataBuffers.newBuilder()
246+
.addAllBuffers(dataBuffers)
247+
.build())
248+
.setBlockID(request.getGetSmallFile().getBlock().getBlockID());
249+
}
222250

223251
GetSmallFileResponseProto.Builder getSmallFile =
224252
GetSmallFileResponseProto.newBuilder().setData(readChunk);
225253

226-
return getSuccessResponseBuilder(msg)
254+
return getSuccessResponseBuilder(request)
227255
.setCmdType(Type.GetSmallFile)
228256
.setGetSmallFile(getSmallFile)
229257
.build();
@@ -250,13 +278,29 @@ public static ContainerCommandResponseProto getReadContainerResponse(
250278
}
251279

252280
public static ContainerCommandResponseProto getReadChunkResponse(
253-
ContainerCommandRequestProto request, ByteString data) {
254-
255-
ReadChunkResponseProto.Builder response =
256-
ReadChunkResponseProto.newBuilder()
257-
.setChunkData(request.getReadChunk().getChunkData())
258-
.setData(data)
259-
.setBlockID(request.getReadChunk().getBlockID());
281+
ContainerCommandRequestProto request, ChunkBuffer data,
282+
Function<ByteBuffer, ByteString> byteBufferToByteString) {
283+
284+
boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
285+
.equals(ContainerProtos.ReadChunkVersion.V0);
286+
287+
ReadChunkResponseProto.Builder response;
288+
289+
if (isReadChunkV0) {
290+
// V0 has all response data in a single ByteBuffer
291+
response = ReadChunkResponseProto.newBuilder()
292+
.setChunkData(request.getReadChunk().getChunkData())
293+
.setData(data.toByteString(byteBufferToByteString))
294+
.setBlockID(request.getReadChunk().getBlockID());
295+
} else {
296+
// V1 splits response data into a list of ByteBuffers
297+
response = ReadChunkResponseProto.newBuilder()
298+
.setChunkData(request.getReadChunk().getChunkData())
299+
.setDataBuffers(DataBuffers.newBuilder()
300+
.addAllBuffers(data.toByteStringList(byteBufferToByteString))
301+
.build())
302+
.setBlockID(request.getReadChunk().getBlockID());
303+
}
260304

261305
return getSuccessResponseBuilder(request)
262306
.setReadChunk(response)

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
226226
ReadChunkRequestProto.Builder readChunkRequest =
227227
ReadChunkRequestProto.newBuilder()
228228
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
229-
.setChunkData(chunk);
229+
.setChunkData(chunk)
230+
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
230231
String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
231232
ContainerCommandRequestProto.Builder builder =
232233
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
@@ -489,6 +490,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
489490
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
490491
GetSmallFileRequestProto
491492
.newBuilder().setBlock(getBlock)
493+
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1)
492494
.build();
493495
String id = client.getPipeline().getClosestNode().getUuidString();
494496

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdds.scm.utils;
20+
21+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
22+
23+
public final class ClientCommandsUtils {
24+
25+
/** Utility classes should not be constructed. **/
26+
private ClientCommandsUtils() {
27+
28+
}
29+
30+
public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
31+
ContainerProtos.ReadChunkRequestProto readChunkRequest) {
32+
if (readChunkRequest.hasReadChunkVersion()) {
33+
return readChunkRequest.getReadChunkVersion();
34+
} else {
35+
return ContainerProtos.ReadChunkVersion.V0;
36+
}
37+
}
38+
39+
public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
40+
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest) {
41+
if (getSmallFileRequest.hasReadChunkVersion()) {
42+
return getSmallFileRequest.getReadChunkVersion();
43+
} else {
44+
return ContainerProtos.ReadChunkVersion.V0;
45+
}
46+
}
47+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdds.scm.utils;
20+
21+
/**
22+
* This package contains utility classes for the SCM and client protocols.
23+
*/

0 commit comments

Comments
 (0)