From 6f1828b8f3b211956dc0e642033beda075e78d9d Mon Sep 17 00:00:00 2001 From: captainzmc Date: Tue, 1 Mar 2022 14:41:32 +0800 Subject: [PATCH 1/3] Streaming write support both pipeline model and star model --- .../apache/hadoop/hdds/scm/OzoneClientConfig.java | 15 +++++++++++++++ .../hdds/scm/storage/BlockDataStreamOutput.java | 13 ++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index d793d93e8e4c..df3f2c2af9e9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -89,6 +89,13 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private long streamWindowSize = 64 * 1024 * 1024; + @Config(key = "datastream.pipeline.model", + defaultValue = "true", + description = "Streaming write support both pipeline model(datanode1->" + + "datanode2->datanode3) and star model(datanode1->datanode2, " + + "datanode1->datanode3). By default we use pipeline model.", + tags = ConfigTag.CLIENT) + private boolean datastreamPipelineModel = true; @Config(key = "stream.buffer.increment", defaultValue = "0B", @@ -313,4 +320,12 @@ public ChecksumCombineMode getChecksumCombineMode() { ChecksumCombineMode.COMPOSITE_CRC.name()); } } + + public boolean isDatastreamPipelineModel() { + return datastreamPipelineModel; + } + + public void setDatastreamPipelineModel(boolean datastreamPipelineModel) { + this.datastreamPipelineModel = datastreamPipelineModel; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 8b3e32cf41a4..9b9c3a8f223f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -137,6 +137,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private XceiverClientMetrics metrics; // buffers for which putBlock is yet to be executed private List buffersForPutBlock; + private Boolean isDatastreamPipelineModel; /** * Creates a new BlockDataStreamOutput. * @@ -184,6 +185,7 @@ public BlockDataStreamOutput( checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); metrics = XceiverClientManager.getXceiverClientMetrics(); + isDatastreamPipelineModel = config.isDatastreamPipelineModel(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -203,9 +205,14 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(builder.build(), null); - return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) - .stream(message.getContent().asReadOnlyByteBuffer(), - getRoutingTable(pipeline)); + if (isDatastreamPipelineModel) { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer(), + getRoutingTable(pipeline)); + } else { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer()); + } } public RoutingTable getRoutingTable(Pipeline pipeline) { From 71f0eff9129442c1d288ec608ff82033dda7f418 Mon Sep 17 00:00:00 2001 From: captainzmc Date: Tue, 1 Mar 2022 21:55:38 +0800 Subject: [PATCH 2/3] fix npe --- .../hadoop/hdds/scm/OzoneClientConfig.java | 18 +++++++++--------- .../scm/storage/BlockDataStreamOutput.java | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index df3f2c2af9e9..091d2a83dd33 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -89,13 +89,13 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private long streamWindowSize = 64 * 1024 * 1024; - @Config(key = "datastream.pipeline.model", + @Config(key = "datastream.pipeline.mode", defaultValue = "true", - description = "Streaming write support both pipeline model(datanode1->" + - "datanode2->datanode3) and star model(datanode1->datanode2, " + - "datanode1->datanode3). By default we use pipeline model.", + description = "Streaming write support both pipeline mode(datanode1->" + + "datanode2->datanode3) and star mode(datanode1->datanode2, " + + "datanode1->datanode3). By default we use pipeline mode.", tags = ConfigTag.CLIENT) - private boolean datastreamPipelineModel = true; + private boolean datastreamPipelineMode = true; @Config(key = "stream.buffer.increment", defaultValue = "0B", @@ -321,11 +321,11 @@ public ChecksumCombineMode getChecksumCombineMode() { } } - public boolean isDatastreamPipelineModel() { - return datastreamPipelineModel; + public boolean isDatastreamPipelineMode() { + return datastreamPipelineMode; } - public void setDatastreamPipelineModel(boolean datastreamPipelineModel) { - this.datastreamPipelineModel = datastreamPipelineModel; + public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { + this.datastreamPipelineMode = datastreamPipelineMode; } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 9b9c3a8f223f..3df5eb0e1203 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -137,7 +137,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private XceiverClientMetrics metrics; // buffers for which putBlock is yet to be executed private List buffersForPutBlock; - private Boolean isDatastreamPipelineModel; + private boolean isDatastreamPipelineMode; /** * Creates a new BlockDataStreamOutput. * @@ -155,6 +155,7 @@ public BlockDataStreamOutput( ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; + this.isDatastreamPipelineMode = config.isDatastreamPipelineMode(); this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -185,7 +186,6 @@ public BlockDataStreamOutput( checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); metrics = XceiverClientManager.getXceiverClientMetrics(); - isDatastreamPipelineModel = config.isDatastreamPipelineModel(); } private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { @@ -205,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(builder.build(), null); - if (isDatastreamPipelineModel) { + if (isDatastreamPipelineMode) { return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) .stream(message.getContent().asReadOnlyByteBuffer(), getRoutingTable(pipeline)); From a91fba040d3eb15ba59fe9ad047d72e02e9ecf1c Mon Sep 17 00:00:00 2001 From: captainzmc Date: Wed, 2 Mar 2022 19:07:28 +0800 Subject: [PATCH 3/3] trigger new CI