From 603e3e8e50cd15b8489c3077ecfbad2ebd4244ef Mon Sep 17 00:00:00 2001 From: sadanand48 Date: Wed, 18 Nov 2020 20:13:54 +0530 Subject: [PATCH 1/5] HDDS-4475.Extend DatanodeChunkGenerator to write all on all pipelines of a set of dns --- .../ozone/freon/DatanodeChunkGenerator.java | 114 +++++++++++------- 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index a6e28324d38a..cc09d290415b 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -17,11 +17,13 @@ package org.apache.hadoop.ozone.freon; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; @@ -75,7 +77,7 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements description = "Pipeline to use. By default the first RATIS/THREE " + "pipeline will be used.", defaultValue = "") - private String pipelineId; + private String pipelineIds; private XceiverClientSpi xceiverClientSpi; @@ -84,10 +86,11 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements private ByteString dataToWrite; private ChecksumData checksumProtobuf; + boolean isThreeNodePipeline = false; + @Override public Void call() throws Exception { - init(); OzoneConfiguration ozoneConf = createOzoneConfiguration(); if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) { @@ -95,47 +98,56 @@ public Void call() throws Exception { "Datanode chunk generator is not supported in secure environment"); } - try (StorageContainerLocationProtocol scmLocationClient = - createStorageContainerLocationClient(ozoneConf)) { - List pipelines = scmLocationClient.listPipelines(); - Pipeline pipeline; - if (pipelineId != null && pipelineId.length() > 0) { - pipeline = pipelines.stream() - .filter(p -> p.getId().toString().equals(pipelineId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is defined, but there is no such pipeline: " - + pipelineId)); - - } else { - pipeline = pipelines.stream() - .filter(p -> p.getFactor() == ReplicationFactor.THREE) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is NOT defined, and no pipeline " + - "has been found with factor=THREE")); - LOG.info("Using pipeline {}", pipeline.getId()); - } - - try (XceiverClientManager xceiverClientManager = - new XceiverClientManager(ozoneConf)) { - xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); - - timer = getMetrics().timer("chunk-write"); - - byte[] data = RandomStringUtils.randomAscii(chunkSize) - .getBytes(StandardCharsets.UTF_8); - - dataToWrite = ByteString.copyFrom(data); - - Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); - checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); - - runTests(this::writeChunk); - } - } finally { - if (xceiverClientSpi != null) { - xceiverClientSpi.close(); + List pips = Arrays.asList(pipelineIds.split(",")); + + for (String pipelineId: pips) { + init(); + LOG.info("Writing block for pipeline:" + pipelineId); + try (StorageContainerLocationProtocol scmLocationClient = + createStorageContainerLocationClient(ozoneConf)) { + List pipelines = scmLocationClient.listPipelines(); + Pipeline pipeline; + if (pipelineId != null && pipelineId.length() > 0) { + pipeline = pipelines.stream() + .filter(p -> p.getId().toString().equals("PipelineID=" + pipelineId)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException( + "Pipeline ID is defined, but there is no such pipeline: " + + pipelineId)); + if (pipeline.getFactor()== ReplicationFactor.THREE){ + isThreeNodePipeline = true; + } + + } else { + pipeline = pipelines.stream() + .filter(p -> p.getFactor() == ReplicationFactor.THREE) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException( + "Pipeline ID is NOT defined, and no pipeline " + + "has been found with factor=THREE")); + LOG.info("Using pipeline {}", pipeline.getId()); + } + + try (XceiverClientManager xceiverClientManager = + new XceiverClientManager(ozoneConf)) { + xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); + + timer = getMetrics().timer("chunk-write"); + + byte[] data = RandomStringUtils.randomAscii(chunkSize) + .getBytes(StandardCharsets.UTF_8); + + dataToWrite = ByteString.copyFrom(data); + + Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); + checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); + + runTests(this::writeChunk); + } + } finally { + if (xceiverClientSpi != null) { + xceiverClientSpi.close(); + } } } return null; @@ -165,7 +177,20 @@ private void writeChunk(long stepNo) .setChunkData(chunkInfo) .setData(dataToWrite); - String id = xceiverClientSpi.getPipeline().getFirstNode().getUuidString(); + if (isThreeNodePipeline){ + for (DatanodeDetails dn: xceiverClientSpi.getPipeline().getNodes()){ + sendWriteChunkRequest(blockId, writeChunkRequest, dn); + } + } + else { + sendWriteChunkRequest(blockId, writeChunkRequest, xceiverClientSpi.getPipeline().getFirstNode()); + } + } + + private void sendWriteChunkRequest(DatanodeBlockID blockId, + WriteChunkRequestProto.Builder writeChunkRequest, + DatanodeDetails datanodeDetails) throws Exception { + String id = datanodeDetails.getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto @@ -188,7 +213,6 @@ private void writeChunk(long stepNo) } return null; }); - } } From e8224b05dea7e8f53ece0bbd03275c241d155ebe Mon Sep 17 00:00:00 2001 From: sadanand48 Date: Mon, 30 Nov 2020 19:32:28 +0530 Subject: [PATCH 2/5] Addressed review comments. added datanodes argument --- .../ozone/freon/DatanodeChunkGenerator.java | 148 ++++++++++++------ 1 file changed, 99 insertions(+), 49 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index cc09d290415b..ce7208eca9e6 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -16,10 +16,13 @@ */ package org.apache.hadoop.ozone.freon; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -79,6 +82,11 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements defaultValue = "") private String pipelineIds; + @Option(names = {"-d", "--datanodes"}, + description = "Datanodes to use. ", + defaultValue = "") + private String datanodes; + private XceiverClientSpi xceiverClientSpi; private Timer timer; @@ -86,7 +94,6 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements private ByteString dataToWrite; private ChecksumData checksumProtobuf; - boolean isThreeNodePipeline = false; @Override public Void call() throws Exception { @@ -98,59 +105,107 @@ public Void call() throws Exception { "Datanode chunk generator is not supported in secure environment"); } - List pips = Arrays.asList(pipelineIds.split(",")); + List pipelines = Arrays.asList(pipelineIds.split(",")); - for (String pipelineId: pips) { - init(); - LOG.info("Writing block for pipeline:" + pipelineId); - try (StorageContainerLocationProtocol scmLocationClient = - createStorageContainerLocationClient(ozoneConf)) { - List pipelines = scmLocationClient.listPipelines(); - Pipeline pipeline; - if (pipelineId != null && pipelineId.length() > 0) { - pipeline = pipelines.stream() - .filter(p -> p.getId().toString().equals("PipelineID=" + pipelineId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is defined, but there is no such pipeline: " - + pipelineId)); - if (pipeline.getFactor()== ReplicationFactor.THREE){ - isThreeNodePipeline = true; - } + List datanodeHosts = Arrays.asList(this.datanodes.split(",")); - } else { - pipeline = pipelines.stream() + pipelines = getPipelinesFromDatanodes(ozoneConf, pipelines, datanodeHosts); + + try (StorageContainerLocationProtocol scmLocationClient = + createStorageContainerLocationClient(ozoneConf)) { + List pipelinesFromSCM = scmLocationClient.listPipelines(); + Pipeline temp; + if (!arePipelinesOrDatanodesProvided()) { + //default behaviour if no arguments provided + init(); + temp = pipelinesFromSCM.stream() .filter(p -> p.getFactor() == ReplicationFactor.THREE) .findFirst() .orElseThrow(() -> new IllegalArgumentException( "Pipeline ID is NOT defined, and no pipeline " + "has been found with factor=THREE")); - LOG.info("Using pipeline {}", pipeline.getId()); + LOG.info("Using pipeline {}", temp.getId()); + runTest(ozoneConf, temp); + } else { + List futures = new ArrayList<>(); + for(String pipelineId:pipelines){ + futures.add(CompletableFuture.runAsync(()->writeOnPipeline( + ozoneConf, pipelinesFromSCM, pipelineId))); + } + CompletableFuture.allOf(futures.toArray( + new CompletableFuture[pipelines.size()])) + .join(); + } + } finally { + if (xceiverClientSpi != null) { + xceiverClientSpi.close(); + } + } + return null; + } + + private void writeOnPipeline(OzoneConfiguration ozoneConf, + List pipelinesFromSCM, String pipelineId) { + Pipeline temp; + init(); + LOG.info("Writing block for pipeline:" + pipelineId); + temp = pipelinesFromSCM.stream() + .filter(p -> p.getId().toString().equals("PipelineID=" + pipelineId)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException( + "Pipeline ID is defined, but there is no such pipeline: " + + pipelineId)); + try { + runTest(ozoneConf, temp); + } catch (IOException e) { + LOG.error("Could not write chunks"); + } + } + + private boolean arePipelinesOrDatanodesProvided() { + return !(pipelineIds.equals("") && datanodes.equals("")); + } + + private List getPipelinesFromDatanodes(OzoneConfiguration ozoneConf, + List pipelines, List dns) throws IOException { + if(!datanodes.equals("")) { + try (StorageContainerLocationProtocol scmLocationClient = + createStorageContainerLocationClient(ozoneConf)) { + List pipelineList = scmLocationClient.listPipelines(); + pipelines = new ArrayList(); + for (Pipeline p : pipelineList) { + for (DatanodeDetails dn : p.getNodes()) { + if (dns.contains(dn.getHostName())) { + if(!pipelines.contains(p.getId().toString().substring(11))) { + pipelines.add(p.getId().toString().substring(11)); + } + } + } + LOG.debug("Pipeline list size:" + pipelines.size()); } + } + } + return pipelines; + } - try (XceiverClientManager xceiverClientManager = - new XceiverClientManager(ozoneConf)) { - xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); + private void runTest(OzoneConfiguration ozoneConf, Pipeline pipeline) + throws IOException { + try (XceiverClientManager xceiverClientManager = + new XceiverClientManager(ozoneConf)) { + xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); - timer = getMetrics().timer("chunk-write"); + timer = getMetrics().timer("chunk-write"); - byte[] data = RandomStringUtils.randomAscii(chunkSize) - .getBytes(StandardCharsets.UTF_8); + byte[] data = RandomStringUtils.randomAscii(chunkSize) + .getBytes(StandardCharsets.UTF_8); - dataToWrite = ByteString.copyFrom(data); + dataToWrite = ByteString.copyFrom(data); - Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); - checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); + Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); + checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); - runTests(this::writeChunk); - } - } finally { - if (xceiverClientSpi != null) { - xceiverClientSpi.close(); - } - } + runTests(this::writeChunk); } - return null; } private void writeChunk(long stepNo) @@ -177,19 +232,14 @@ private void writeChunk(long stepNo) .setChunkData(chunkInfo) .setData(dataToWrite); - if (isThreeNodePipeline){ - for (DatanodeDetails dn: xceiverClientSpi.getPipeline().getNodes()){ - sendWriteChunkRequest(blockId, writeChunkRequest, dn); - } - } - else { - sendWriteChunkRequest(blockId, writeChunkRequest, xceiverClientSpi.getPipeline().getFirstNode()); - } + sendWriteChunkRequest(blockId, writeChunkRequest, + xceiverClientSpi.getPipeline().getFirstNode()); + } private void sendWriteChunkRequest(DatanodeBlockID blockId, - WriteChunkRequestProto.Builder writeChunkRequest, - DatanodeDetails datanodeDetails) throws Exception { + WriteChunkRequestProto.Builder writeChunkRequest, + DatanodeDetails datanodeDetails) throws Exception { String id = datanodeDetails.getUuidString(); ContainerCommandRequestProto.Builder builder = From 8b0870365d8a1fa02ba4adb8abaf05a1d50d80b2 Mon Sep 17 00:00:00 2001 From: sadanand48 Date: Thu, 24 Dec 2020 02:34:07 +0530 Subject: [PATCH 3/5] addressed comments --- .../ozone/freon/DatanodeChunkGenerator.java | 132 +++++++++--------- 1 file changed, 68 insertions(+), 64 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index ce7208eca9e6..c7faf2e72583 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -87,7 +87,8 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements defaultValue = "") private String datanodes; - private XceiverClientSpi xceiverClientSpi; + private XceiverClientManager xceiverClientManager; + private List xceiverClients; private Timer timer; @@ -100,16 +101,18 @@ public Void call() throws Exception { OzoneConfiguration ozoneConf = createOzoneConfiguration(); + xceiverClientManager = + new XceiverClientManager(ozoneConf); if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) { throw new IllegalArgumentException( "Datanode chunk generator is not supported in secure environment"); } - List pipelines = Arrays.asList(pipelineIds.split(",")); + List pipelinesFromCmd = Arrays.asList(pipelineIds.split(",")); List datanodeHosts = Arrays.asList(this.datanodes.split(",")); - pipelines = getPipelinesFromDatanodes(ozoneConf, pipelines, datanodeHosts); + List pipelines; try (StorageContainerLocationProtocol scmLocationClient = createStorageContainerLocationClient(ozoneConf)) { @@ -124,41 +127,65 @@ public Void call() throws Exception { .orElseThrow(() -> new IllegalArgumentException( "Pipeline ID is NOT defined, and no pipeline " + "has been found with factor=THREE")); + XceiverClientSpi xceiverClientSpi = xceiverClientManager + .acquireClient(temp); + xceiverClients = new ArrayList<>(); + xceiverClients.add(xceiverClientSpi); LOG.info("Using pipeline {}", temp.getId()); - runTest(ozoneConf, temp); + runTest(xceiverClientSpi); } else { - List futures = new ArrayList<>(); - for(String pipelineId:pipelines){ - futures.add(CompletableFuture.runAsync(()->writeOnPipeline( - ozoneConf, pipelinesFromSCM, pipelineId))); + pipelines = new ArrayList<>(); + for(String pipelineId:pipelinesFromCmd){ + List tempPipelines = pipelinesFromSCM.stream() + .filter((p -> p.getId().toString() + .equals("PipelineID=" + pipelineId) + || pipelineContainsDatanode(p, datanodeHosts))) + .collect(Collectors.toList()); + for (Pipeline p:tempPipelines){ + // avoid duplicates + if (!pipelines.contains(p)){ + pipelines.add(p); + } + } + } + if (pipelines.isEmpty()){ + throw new IllegalArgumentException( + "Coudln't find the any/the selected pipeline"); + } else { + writeOnPipeline( + ozoneConf, pipelines); } - CompletableFuture.allOf(futures.toArray( - new CompletableFuture[pipelines.size()])) - .join(); } } finally { - if (xceiverClientSpi != null) { - xceiverClientSpi.close(); + for (XceiverClientSpi xceiverClientSpi : xceiverClients) { + if (xceiverClientSpi != null) { + xceiverClientSpi.close(); + } } } return null; } + private boolean pipelineContainsDatanode(Pipeline p, + List datanodeHosts) { + for (DatanodeDetails dn:p.getNodes()){ + if (datanodeHosts.contains(dn.getHostName())){ + return true; + } + } + return false; + } + private void writeOnPipeline(OzoneConfiguration ozoneConf, - List pipelinesFromSCM, String pipelineId) { - Pipeline temp; - init(); - LOG.info("Writing block for pipeline:" + pipelineId); - temp = pipelinesFromSCM.stream() - .filter(p -> p.getId().toString().equals("PipelineID=" + pipelineId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException( - "Pipeline ID is defined, but there is no such pipeline: " - + pipelineId)); - try { - runTest(ozoneConf, temp); - } catch (IOException e) { - LOG.error("Could not write chunks"); + List pipelines) throws IOException { + LOG.info("Inside write Pipeline and pipeline size" + pipelines.size()); + xceiverClients = new ArrayList<>(); + for (Pipeline p: pipelines){ + init(); + LOG.info("run test on pipeline" + p.getId().toString()); + XceiverClientSpi clientSpi = xceiverClientManager.acquireClient(p); + xceiverClients.add(clientSpi); + runTest(clientSpi); } } @@ -166,49 +193,24 @@ private boolean arePipelinesOrDatanodesProvided() { return !(pipelineIds.equals("") && datanodes.equals("")); } - private List getPipelinesFromDatanodes(OzoneConfiguration ozoneConf, - List pipelines, List dns) throws IOException { - if(!datanodes.equals("")) { - try (StorageContainerLocationProtocol scmLocationClient = - createStorageContainerLocationClient(ozoneConf)) { - List pipelineList = scmLocationClient.listPipelines(); - pipelines = new ArrayList(); - for (Pipeline p : pipelineList) { - for (DatanodeDetails dn : p.getNodes()) { - if (dns.contains(dn.getHostName())) { - if(!pipelines.contains(p.getId().toString().substring(11))) { - pipelines.add(p.getId().toString().substring(11)); - } - } - } - LOG.debug("Pipeline list size:" + pipelines.size()); - } - } - } - return pipelines; - } - private void runTest(OzoneConfiguration ozoneConf, Pipeline pipeline) + private void runTest(XceiverClientSpi clientSpi) throws IOException { - try (XceiverClientManager xceiverClientManager = - new XceiverClientManager(ozoneConf)) { - xceiverClientSpi = xceiverClientManager.acquireClient(pipeline); - timer = getMetrics().timer("chunk-write"); + timer = getMetrics().timer("chunk-write"); - byte[] data = RandomStringUtils.randomAscii(chunkSize) - .getBytes(StandardCharsets.UTF_8); + byte[] data = RandomStringUtils.randomAscii(chunkSize) + .getBytes(StandardCharsets.UTF_8); - dataToWrite = ByteString.copyFrom(data); + dataToWrite = ByteString.copyFrom(data); - Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); - checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); + Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); + checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); - runTests(this::writeChunk); - } + runTests(stepNo -> writeChunk(stepNo, clientSpi)); } - private void writeChunk(long stepNo) + private void writeChunk(long stepNo, XceiverClientSpi clientSpi) throws Exception { //Always use this fake blockid. @@ -233,13 +235,15 @@ private void writeChunk(long stepNo) .setData(dataToWrite); sendWriteChunkRequest(blockId, writeChunkRequest, - xceiverClientSpi.getPipeline().getFirstNode()); + clientSpi); } private void sendWriteChunkRequest(DatanodeBlockID blockId, WriteChunkRequestProto.Builder writeChunkRequest, - DatanodeDetails datanodeDetails) throws Exception { + XceiverClientSpi xceiverClientSpi) throws Exception { + DatanodeDetails datanodeDetails = xceiverClientSpi. + getPipeline().getFirstNode(); String id = datanodeDetails.getUuidString(); ContainerCommandRequestProto.Builder builder = From 9e084ee6c4e1bf32a0ff03a4fb18ffcbd2b3eabf Mon Sep 17 00:00:00 2001 From: sadanand48 Date: Thu, 7 Jan 2021 18:25:40 +0530 Subject: [PATCH 4/5] addressed comments --- .../ozone/freon/DatanodeChunkGenerator.java | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index c7faf2e72583..97a2d71ab4a5 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -18,9 +18,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.List; import java.util.ArrayList; +import java.util.HashSet; import java.util.Arrays; -import java.util.List; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -112,48 +114,47 @@ public Void call() throws Exception { List datanodeHosts = Arrays.asList(this.datanodes.split(",")); - List pipelines; + Set pipelines; try (StorageContainerLocationProtocol scmLocationClient = createStorageContainerLocationClient(ozoneConf)) { List pipelinesFromSCM = scmLocationClient.listPipelines(); - Pipeline temp; + Pipeline firstPipeline; + init(); if (!arePipelinesOrDatanodesProvided()) { //default behaviour if no arguments provided - init(); - temp = pipelinesFromSCM.stream() + firstPipeline = pipelinesFromSCM.stream() .filter(p -> p.getFactor() == ReplicationFactor.THREE) .findFirst() .orElseThrow(() -> new IllegalArgumentException( "Pipeline ID is NOT defined, and no pipeline " + "has been found with factor=THREE")); XceiverClientSpi xceiverClientSpi = xceiverClientManager - .acquireClient(temp); + .acquireClient(firstPipeline); xceiverClients = new ArrayList<>(); xceiverClients.add(xceiverClientSpi); - LOG.info("Using pipeline {}", temp.getId()); - runTest(xceiverClientSpi); + LOG.info("Using pipeline {}", firstPipeline.getId()); + runTest(); } else { - pipelines = new ArrayList<>(); + xceiverClients = new ArrayList<>(); + pipelines = new HashSet<>(); for(String pipelineId:pipelinesFromCmd){ - List tempPipelines = pipelinesFromSCM.stream() + List selectedPipelines = pipelinesFromSCM.stream() .filter((p -> p.getId().toString() .equals("PipelineID=" + pipelineId) || pipelineContainsDatanode(p, datanodeHosts))) .collect(Collectors.toList()); - for (Pipeline p:tempPipelines){ - // avoid duplicates - if (!pipelines.contains(p)){ - pipelines.add(p); - } - } + pipelines.addAll(selectedPipelines); + } + for (Pipeline p:pipelines){ + LOG.info("Writing to pipeline: " + p.getId()); + xceiverClients.add(xceiverClientManager.acquireClient(p)); } if (pipelines.isEmpty()){ throw new IllegalArgumentException( "Coudln't find the any/the selected pipeline"); } else { - writeOnPipeline( - ozoneConf, pipelines); + runTest(); } } } finally { @@ -176,25 +177,12 @@ private boolean pipelineContainsDatanode(Pipeline p, return false; } - private void writeOnPipeline(OzoneConfiguration ozoneConf, - List pipelines) throws IOException { - LOG.info("Inside write Pipeline and pipeline size" + pipelines.size()); - xceiverClients = new ArrayList<>(); - for (Pipeline p: pipelines){ - init(); - LOG.info("run test on pipeline" + p.getId().toString()); - XceiverClientSpi clientSpi = xceiverClientManager.acquireClient(p); - xceiverClients.add(clientSpi); - runTest(clientSpi); - } - } - private boolean arePipelinesOrDatanodesProvided() { return !(pipelineIds.equals("") && datanodes.equals("")); } - private void runTest(XceiverClientSpi clientSpi) + private void runTest() throws IOException { timer = getMetrics().timer("chunk-write"); @@ -207,10 +195,10 @@ private void runTest(XceiverClientSpi clientSpi) Checksum checksum = new Checksum(ChecksumType.CRC32, chunkSize); checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); - runTests(stepNo -> writeChunk(stepNo, clientSpi)); + runTests(this::writeChunk); } - private void writeChunk(long stepNo, XceiverClientSpi clientSpi) + private void writeChunk(long stepNo) throws Exception { //Always use this fake blockid. @@ -234,6 +222,8 @@ private void writeChunk(long stepNo, XceiverClientSpi clientSpi) .setChunkData(chunkInfo) .setData(dataToWrite); + XceiverClientSpi clientSpi = xceiverClients.get( + (int) (stepNo%(xceiverClients.size()))); sendWriteChunkRequest(blockId, writeChunkRequest, clientSpi); From 8a0d7970269d8300f86f11ee9dd318411b1482e5 Mon Sep 17 00:00:00 2001 From: sadanand48 Date: Tue, 12 Jan 2021 10:28:42 +0530 Subject: [PATCH 5/5] fixed typos & addressed comment --- .../hadoop/ozone/freon/DatanodeChunkGenerator.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index 97a2d71ab4a5..c0c58d03595f 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -85,7 +85,9 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements private String pipelineIds; @Option(names = {"-d", "--datanodes"}, - description = "Datanodes to use. ", + description = "Datanodes to use." + + " Test will write to all the existing pipelines " + + "which this datanode is member of.", defaultValue = "") private String datanodes; @@ -133,8 +135,6 @@ public Void call() throws Exception { .acquireClient(firstPipeline); xceiverClients = new ArrayList<>(); xceiverClients.add(xceiverClientSpi); - LOG.info("Using pipeline {}", firstPipeline.getId()); - runTest(); } else { xceiverClients = new ArrayList<>(); pipelines = new HashSet<>(); @@ -152,11 +152,10 @@ public Void call() throws Exception { } if (pipelines.isEmpty()){ throw new IllegalArgumentException( - "Coudln't find the any/the selected pipeline"); - } else { - runTest(); + "Couldn't find the any/the selected pipeline"); } } + runTest(); } finally { for (XceiverClientSpi xceiverClientSpi : xceiverClients) { if (xceiverClientSpi != null) {