From 7a855e70268904f7e5c8cb61f206359c23f54165 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 18 Nov 2020 19:25:25 +0100 Subject: [PATCH 1/4] HDDS-4473. Reduce number of sortDatanodes RPC calls --- .../hadoop/ozone/om/KeyManagerImpl.java | 92 +++++++++----- .../hadoop/ozone/om/TestKeyManagerUnit.java | 119 ++++++++++++++---- 2 files changed, 156 insertions(+), 55 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 72ebfd2ea70e..d571dae25683 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundService; @@ -109,6 +111,7 @@ import com.google.common.base.Strings; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; + import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; @@ -699,7 +702,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) refreshPipeline(value); if (args.getSortDatanodes()) { - sortDatanodeInPipeline(value, clientAddress); + sortDatanodes(clientAddress, value); } return value; } @@ -1827,7 +1830,7 @@ private OzoneFileStatus getOzoneFileStatus(String volumeName, // Please refer this jira for more details. refreshPipeline(fileKeyInfo); if (sortDatanodes) { - sortDatanodeInPipeline(fileKeyInfo, clientAddress); + sortDatanodes(clientAddress, fileKeyInfo); } return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false); } @@ -2212,9 +2215,7 @@ public List listStatus(OmKeyArgs args, boolean recursive, refreshPipeline(keyInfoList); if (args.getSortDatanodes()) { - for (OzoneFileStatus fileStatus : fileStatusList) { - sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); - } + sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0])); } return fileStatusList; @@ -2309,38 +2310,61 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) return encInfo; } - private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) { - if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) { - for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { - key.getLocationList().forEach(k -> { - List nodes = k.getPipeline().getNodes(); - if (nodes == null || nodes.isEmpty()) { - LOG.warn("Datanodes for pipeline {} is empty", - k.getPipeline().getId().toString()); - return; - } - List nodeList = new ArrayList<>(); - nodes.stream().forEach(node -> - nodeList.add(node.getUuidString())); - try { - List sortedNodes = scmClient.getBlockClient() - .sortDatanodes(nodeList, clientMachine); - k.getPipeline().setNodesInOrder(sortedNodes); - if (LOG.isDebugEnabled()) { - LOG.debug("Sort datanodes {} for client {}, return {}", nodes, - clientMachine, sortedNodes); + @VisibleForTesting + void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { + if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty()) { + Map> sortedPipelines = new HashMap<>(); + for (OmKeyInfo keyInfo : keyInfos) { + OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations(); + if (key == null) { + LOG.warn("No location for key {}", keyInfo); + continue; + } + for (OmKeyLocationInfo k : key.getLocationList()) { + Pipeline pipeline = k.getPipeline(); + PipelineID pipelineId = pipeline.getId(); + List sortedNodes = sortedPipelines.get(pipelineId); + if (sortedNodes == null) { + List nodes = pipeline.getNodes(); + if (nodes.isEmpty()) { + LOG.warn("No datanodes in pipeline {}", pipelineId); + continue; + } + sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo); + if (sortedNodes != null) { + sortedPipelines.put(pipelineId, sortedNodes); } - } catch (IOException e) { - LOG.warn("Unable to sort datanodes based on distance to " + - "client, volume=" + keyInfo.getVolumeName() + - ", bucket=" + keyInfo.getBucketName() + - ", key=" + keyInfo.getKeyName() + - ", client=" + clientMachine + - ", datanodes=" + nodes.toString() + - ", exception=" + e.getMessage()); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Found sorted datanodes for pipeline {} and client {} " + + "in cache", pipelineId, clientMachine); } - }); + pipeline.setNodesInOrder(sortedNodes); + } } } } + + private List sortDatanodes(String clientMachine, + List nodes, OmKeyInfo keyInfo) { + List nodeList = new ArrayList<>(nodes.size()); + for (DatanodeDetails node : nodes) { + nodeList.add(node.getUuidString()); + } + List sortedNodes = null; + try { + sortedNodes = scmClient.getBlockClient() + .sortDatanodes(nodeList, clientMachine); + if (LOG.isDebugEnabled()) { + LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes, + clientMachine, sortedNodes); + } + } catch (IOException e) { + LOG.warn("Unable to sort datanodes based on distance to client, " + + " volume={}, bucket={}, key={}, client={}, datanodes={}, " + + " exception={}", + keyInfo.getVolumeName(), keyInfo.getBucketName(), + keyInfo.getKeyName(), clientMachine, nodes, e.getMessage()); + } + return sortedNodes; + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java index 6046ac9a5caf..a947b3550ced 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java @@ -76,8 +76,12 @@ import org.junit.Test; import org.mockito.Mockito; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.toList; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Unit test key manager. @@ -91,6 +95,7 @@ public class TestKeyManagerUnit { private Instant startDate; private File testDir; + private ScmBlockLocationProtocol blockClient; @Before public void setup() throws IOException { @@ -100,14 +105,10 @@ public void setup() throws IOException { testDir.toString()); metadataManager = new OmMetadataManagerImpl(configuration); containerClient = Mockito.mock(StorageContainerLocationProtocol.class); + blockClient = Mockito.mock(ScmBlockLocationProtocol.class); keyManager = new KeyManagerImpl( - Mockito.mock(ScmBlockLocationProtocol.class), - containerClient, - metadataManager, - configuration, - "omtest", - Mockito.mock(OzoneBlockTokenSecretManager.class) - ); + blockClient, containerClient, metadataManager, configuration, + "omtest", Mockito.mock(OzoneBlockTokenSecretManager.class)); startDate = Instant.now(); } @@ -372,10 +373,10 @@ public void testLookupFileWithDnFailure() throws IOException { List cps = new ArrayList<>(); ContainerInfo ci = Mockito.mock(ContainerInfo.class); - Mockito.when(ci.getContainerID()).thenReturn(1L); + when(ci.getContainerID()).thenReturn(1L); cps.add(new ContainerWithPipeline(ci, pipelineTwo)); - Mockito.when(containerClient.getContainerWithPipelineBatch(containerIDs)) + when(containerClient.getContainerWithPipelineBatch(containerIDs)) .thenReturn(cps); final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder() @@ -443,6 +444,7 @@ public void listStatus() throws Exception { String volume = "vol"; String bucket = "bucket"; String keyPrefix = "key"; + String client = "client.host"; TestOMRequestUtils.addVolumeToDB(volume, OzoneConsts.OZONE, metadataManager); @@ -450,17 +452,12 @@ public void listStatus() throws Exception { TestOMRequestUtils.addBucketToDB(volume, bucket, metadataManager); final Pipeline pipeline = MockPipeline.createPipeline(3); - - OmKeyInfo.Builder keyInfoBuilder = new OmKeyInfo.Builder() - .setVolumeName(volume) - .setBucketName(bucket) - .setCreationTime(Time.now()) - .setOmKeyLocationInfos(singletonList( - new OmKeyLocationInfoGroup(0, new ArrayList<>()))) - .setReplicationFactor(ReplicationFactor.THREE) - .setReplicationType(ReplicationType.RATIS); + final List nodes = pipeline.getNodes().stream() + .map(DatanodeDetails::getUuidString) + .collect(toList()); List containerIDs = new ArrayList<>(); + List containersWithPipeline = new ArrayList<>(); for (long i = 1; i <= 10; i++) { final OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(i, 1L)) @@ -469,9 +466,21 @@ public void listStatus() throws Exception { .setLength(256000) .build(); + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(i) + .build(); + containersWithPipeline.add( + new ContainerWithPipeline(containerInfo, pipeline)); containerIDs.add(i); - OmKeyInfo keyInfo = keyInfoBuilder + OmKeyInfo keyInfo = new OmKeyInfo.Builder() + .setVolumeName(volume) + .setBucketName(bucket) + .setCreationTime(Time.now()) + .setOmKeyLocationInfos(singletonList( + new OmKeyLocationInfoGroup(0, new ArrayList<>()))) + .setReplicationFactor(ReplicationFactor.THREE) + .setReplicationType(ReplicationType.RATIS) .setKeyName(keyPrefix + i) .setObjectID(i) .setUpdateID(i) @@ -480,15 +489,83 @@ public void listStatus() throws Exception { TestOMRequestUtils.addKeyToOM(metadataManager, keyInfo); } + when(containerClient.getContainerWithPipelineBatch(containerIDs)) + .thenReturn(containersWithPipeline); + OmKeyArgs.Builder builder = new OmKeyArgs.Builder() .setVolumeName(volume) .setBucketName(bucket) - .setKeyName(""); + .setKeyName("") + .setSortDatanodesInPipeline(true); List fileStatusList = - keyManager.listStatus(builder.build(), false, null, Long.MAX_VALUE); + keyManager.listStatus(builder.build(), false, + null, Long.MAX_VALUE, client); Assert.assertEquals(10, fileStatusList.size()); verify(containerClient).getContainerWithPipelineBatch(containerIDs); + verify(blockClient).sortDatanodes(nodes, client); + } + + @Test + public void sortDatanodes() throws Exception { + // GIVEN + String client = "anyhost"; + int pipelineCount = 3; + int keysPerPipeline = 5; + OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline]; + List> expectedSortDatanodesInvocations = new ArrayList<>(); + Map> expectedSortedNodes = new HashMap<>(); + int ki = 0; + for (int p = 0; p < pipelineCount; p++) { + final Pipeline pipeline = MockPipeline.createPipeline(3); + final List nodes = pipeline.getNodes().stream() + .map(DatanodeDetails::getUuidString) + .collect(toList()); + expectedSortDatanodesInvocations.add(nodes); + final List sortedNodes = pipeline.getNodes().stream() + .sorted(comparing(DatanodeDetails::getUuidString)) + .collect(toList()); + expectedSortedNodes.put(pipeline, sortedNodes); + + when(blockClient.sortDatanodes(nodes, client)) + .thenReturn(sortedNodes); + + for (int i = 1; i <= keysPerPipeline; i++) { + OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder() + .setBlockID(new BlockID(i, 1L)) + .setPipeline(pipeline) + .setOffset(0) + .setLength(256000) + .build(); + + OmKeyInfo keyInfo = new OmKeyInfo.Builder() + .setOmKeyLocationInfos(Arrays.asList( + new OmKeyLocationInfoGroup(0, emptyList()), + new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo)))) + .build(); + keyInfos[ki++] = keyInfo; + } + } + + // WHEN + keyManager.sortDatanodes(client, keyInfos); + + // THEN + // verify all key info locations got updated + for (OmKeyInfo keyInfo : keyInfos) { + OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); + Assert.assertNotNull(locations); + for (OmKeyLocationInfo locationInfo : locations.getLocationList()) { + Pipeline pipeline = locationInfo.getPipeline(); + List expectedOrder = expectedSortedNodes.get(pipeline); + Assert.assertEquals(expectedOrder, pipeline.getNodesInOrder()); + } + } + + // expect one invocation per pipeline + for (List nodes : expectedSortDatanodesInvocations) { + verify(blockClient).sortDatanodes(nodes, client); + } } } From cfd96934d13d53d987fa4097b549254b3a6244e3 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 30 Nov 2020 20:18:04 +0100 Subject: [PATCH 2/4] Use node ID set as key for sorted pipeline cache --- .../hadoop/ozone/om/KeyManagerImpl.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index d571dae25683..6fdb944c72ba 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -55,7 +56,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundService; @@ -2313,7 +2313,7 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) @VisibleForTesting void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty()) { - Map> sortedPipelines = new HashMap<>(); + Map, List> sortedPipelines = new HashMap<>(); for (OmKeyInfo keyInfo : keyInfos) { OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations(); if (key == null) { @@ -2322,21 +2322,22 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { } for (OmKeyLocationInfo k : key.getLocationList()) { Pipeline pipeline = k.getPipeline(); - PipelineID pipelineId = pipeline.getId(); - List sortedNodes = sortedPipelines.get(pipelineId); + List nodes = pipeline.getNodes(); + List uuidList = toNodeUuid(nodes); + Set uuidSet = new HashSet<>(uuidList); + List sortedNodes = sortedPipelines.get(uuidSet); if (sortedNodes == null) { - List nodes = pipeline.getNodes(); if (nodes.isEmpty()) { - LOG.warn("No datanodes in pipeline {}", pipelineId); + LOG.warn("No datanodes in pipeline {}", pipeline.getId()); continue; } - sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo); + sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo, uuidList); if (sortedNodes != null) { - sortedPipelines.put(pipelineId, sortedNodes); + sortedPipelines.put(uuidSet, sortedNodes); } } else if (LOG.isDebugEnabled()) { LOG.debug("Found sorted datanodes for pipeline {} and client {} " - + "in cache", pipelineId, clientMachine); + + "in cache", pipeline.getId(), clientMachine); } pipeline.setNodesInOrder(sortedNodes); } @@ -2345,11 +2346,7 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { } private List sortDatanodes(String clientMachine, - List nodes, OmKeyInfo keyInfo) { - List nodeList = new ArrayList<>(nodes.size()); - for (DatanodeDetails node : nodes) { - nodeList.add(node.getUuidString()); - } + List nodes, OmKeyInfo keyInfo, List nodeList) { List sortedNodes = null; try { sortedNodes = scmClient.getBlockClient() @@ -2363,8 +2360,16 @@ private List sortDatanodes(String clientMachine, + " volume={}, bucket={}, key={}, client={}, datanodes={}, " + " exception={}", keyInfo.getVolumeName(), keyInfo.getBucketName(), - keyInfo.getKeyName(), clientMachine, nodes, e.getMessage()); + keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage()); } return sortedNodes; } + + private static List toNodeUuid(Collection nodes) { + List nodeSet = new ArrayList<>(nodes.size()); + for (DatanodeDetails node : nodes) { + nodeSet.add(node.getUuidString()); + } + return nodeSet; + } } From 457acb6974d48c780346ae4e857f2f7c2220029c Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 30 Nov 2020 22:52:48 +0100 Subject: [PATCH 3/4] fix checkstyle --- .../main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 6fdb944c72ba..f13f9d278381 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -2331,7 +2331,8 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { LOG.warn("No datanodes in pipeline {}", pipeline.getId()); continue; } - sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo, uuidList); + sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo, + uuidList); if (sortedNodes != null) { sortedPipelines.put(uuidSet, sortedNodes); } From 102a57a9b7cd1b70d3e32ca93367398a33ec0592 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Tue, 1 Dec 2020 08:51:01 +0100 Subject: [PATCH 4/4] trigger new CI check