diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 72ebfd2ea70e..f13f9d278381 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -54,6 +55,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundService; @@ -109,6 +111,7 @@ import com.google.common.base.Strings; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; + import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; @@ -699,7 +702,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) refreshPipeline(value); if (args.getSortDatanodes()) { - sortDatanodeInPipeline(value, clientAddress); + sortDatanodes(clientAddress, value); } return value; } @@ -1827,7 +1830,7 @@ private OzoneFileStatus getOzoneFileStatus(String volumeName, // Please refer this jira for more details. refreshPipeline(fileKeyInfo); if (sortDatanodes) { - sortDatanodeInPipeline(fileKeyInfo, clientAddress); + sortDatanodes(clientAddress, fileKeyInfo); } return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false); } @@ -2212,9 +2215,7 @@ public List listStatus(OmKeyArgs args, boolean recursive, refreshPipeline(keyInfoList); if (args.getSortDatanodes()) { - for (OzoneFileStatus fileStatus : fileStatusList) { - sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); - } + sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0])); } return fileStatusList; @@ -2309,38 +2310,67 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo) return encInfo; } - private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) { - if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) { - for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { - key.getLocationList().forEach(k -> { - List nodes = k.getPipeline().getNodes(); - if (nodes == null || nodes.isEmpty()) { - LOG.warn("Datanodes for pipeline {} is empty", - k.getPipeline().getId().toString()); - return; - } - List nodeList = new ArrayList<>(); - nodes.stream().forEach(node -> - nodeList.add(node.getUuidString())); - try { - List sortedNodes = scmClient.getBlockClient() - .sortDatanodes(nodeList, clientMachine); - k.getPipeline().setNodesInOrder(sortedNodes); - if (LOG.isDebugEnabled()) { - LOG.debug("Sort datanodes {} for client {}, return {}", nodes, - clientMachine, sortedNodes); + @VisibleForTesting + void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) { + if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty()) { + Map, List> sortedPipelines = new HashMap<>(); + for (OmKeyInfo keyInfo : keyInfos) { + OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations(); + if (key == null) { + LOG.warn("No location for key {}", keyInfo); + continue; + } + for (OmKeyLocationInfo k : key.getLocationList()) { + Pipeline pipeline = k.getPipeline(); + List nodes = pipeline.getNodes(); + List uuidList = toNodeUuid(nodes); + Set uuidSet = new HashSet<>(uuidList); + List sortedNodes = sortedPipelines.get(uuidSet); + if (sortedNodes == null) { + if (nodes.isEmpty()) { + LOG.warn("No datanodes in pipeline {}", pipeline.getId()); + continue; } - } catch (IOException e) { - LOG.warn("Unable to sort datanodes based on distance to " + - "client, volume=" + keyInfo.getVolumeName() + - ", bucket=" + keyInfo.getBucketName() + - ", key=" + keyInfo.getKeyName() + - ", client=" + clientMachine + - ", datanodes=" + nodes.toString() + - ", exception=" + e.getMessage()); + sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo, + uuidList); + if (sortedNodes != null) { + sortedPipelines.put(uuidSet, sortedNodes); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("Found sorted datanodes for pipeline {} and client {} " + + "in cache", pipeline.getId(), clientMachine); } - }); + pipeline.setNodesInOrder(sortedNodes); + } + } + } + } + + private List sortDatanodes(String clientMachine, + List nodes, OmKeyInfo keyInfo, List nodeList) { + List sortedNodes = null; + try { + sortedNodes = scmClient.getBlockClient() + .sortDatanodes(nodeList, clientMachine); + if (LOG.isDebugEnabled()) { + LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes, + clientMachine, sortedNodes); } + } catch (IOException e) { + LOG.warn("Unable to sort datanodes based on distance to client, " + + " volume={}, bucket={}, key={}, client={}, datanodes={}, " + + " exception={}", + keyInfo.getVolumeName(), keyInfo.getBucketName(), + keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage()); + } + return sortedNodes; + } + + private static List toNodeUuid(Collection nodes) { + List nodeSet = new ArrayList<>(nodes.size()); + for (DatanodeDetails node : nodes) { + nodeSet.add(node.getUuidString()); } + return nodeSet; } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java index 6046ac9a5caf..a947b3550ced 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java @@ -76,8 +76,12 @@ import org.junit.Test; import org.mockito.Mockito; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.toList; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Unit test key manager. @@ -91,6 +95,7 @@ public class TestKeyManagerUnit { private Instant startDate; private File testDir; + private ScmBlockLocationProtocol blockClient; @Before public void setup() throws IOException { @@ -100,14 +105,10 @@ public void setup() throws IOException { testDir.toString()); metadataManager = new OmMetadataManagerImpl(configuration); containerClient = Mockito.mock(StorageContainerLocationProtocol.class); + blockClient = Mockito.mock(ScmBlockLocationProtocol.class); keyManager = new KeyManagerImpl( - Mockito.mock(ScmBlockLocationProtocol.class), - containerClient, - metadataManager, - configuration, - "omtest", - Mockito.mock(OzoneBlockTokenSecretManager.class) - ); + blockClient, containerClient, metadataManager, configuration, + "omtest", Mockito.mock(OzoneBlockTokenSecretManager.class)); startDate = Instant.now(); } @@ -372,10 +373,10 @@ public void testLookupFileWithDnFailure() throws IOException { List cps = new ArrayList<>(); ContainerInfo ci = Mockito.mock(ContainerInfo.class); - Mockito.when(ci.getContainerID()).thenReturn(1L); + when(ci.getContainerID()).thenReturn(1L); cps.add(new ContainerWithPipeline(ci, pipelineTwo)); - Mockito.when(containerClient.getContainerWithPipelineBatch(containerIDs)) + when(containerClient.getContainerWithPipelineBatch(containerIDs)) .thenReturn(cps); final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder() @@ -443,6 +444,7 @@ public void listStatus() throws Exception { String volume = "vol"; String bucket = "bucket"; String keyPrefix = "key"; + String client = "client.host"; TestOMRequestUtils.addVolumeToDB(volume, OzoneConsts.OZONE, metadataManager); @@ -450,17 +452,12 @@ public void listStatus() throws Exception { TestOMRequestUtils.addBucketToDB(volume, bucket, metadataManager); final Pipeline pipeline = MockPipeline.createPipeline(3); - - OmKeyInfo.Builder keyInfoBuilder = new OmKeyInfo.Builder() - .setVolumeName(volume) - .setBucketName(bucket) - .setCreationTime(Time.now()) - .setOmKeyLocationInfos(singletonList( - new OmKeyLocationInfoGroup(0, new ArrayList<>()))) - .setReplicationFactor(ReplicationFactor.THREE) - .setReplicationType(ReplicationType.RATIS); + final List nodes = pipeline.getNodes().stream() + .map(DatanodeDetails::getUuidString) + .collect(toList()); List containerIDs = new ArrayList<>(); + List containersWithPipeline = new ArrayList<>(); for (long i = 1; i <= 10; i++) { final OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(i, 1L)) @@ -469,9 +466,21 @@ public void listStatus() throws Exception { .setLength(256000) .build(); + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(i) + .build(); + containersWithPipeline.add( + new ContainerWithPipeline(containerInfo, pipeline)); containerIDs.add(i); - OmKeyInfo keyInfo = keyInfoBuilder + OmKeyInfo keyInfo = new OmKeyInfo.Builder() + .setVolumeName(volume) + .setBucketName(bucket) + .setCreationTime(Time.now()) + .setOmKeyLocationInfos(singletonList( + new OmKeyLocationInfoGroup(0, new ArrayList<>()))) + .setReplicationFactor(ReplicationFactor.THREE) + .setReplicationType(ReplicationType.RATIS) .setKeyName(keyPrefix + i) .setObjectID(i) .setUpdateID(i) @@ -480,15 +489,83 @@ public void listStatus() throws Exception { TestOMRequestUtils.addKeyToOM(metadataManager, keyInfo); } + when(containerClient.getContainerWithPipelineBatch(containerIDs)) + .thenReturn(containersWithPipeline); + OmKeyArgs.Builder builder = new OmKeyArgs.Builder() .setVolumeName(volume) .setBucketName(bucket) - .setKeyName(""); + .setKeyName("") + .setSortDatanodesInPipeline(true); List fileStatusList = - keyManager.listStatus(builder.build(), false, null, Long.MAX_VALUE); + keyManager.listStatus(builder.build(), false, + null, Long.MAX_VALUE, client); Assert.assertEquals(10, fileStatusList.size()); verify(containerClient).getContainerWithPipelineBatch(containerIDs); + verify(blockClient).sortDatanodes(nodes, client); + } + + @Test + public void sortDatanodes() throws Exception { + // GIVEN + String client = "anyhost"; + int pipelineCount = 3; + int keysPerPipeline = 5; + OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline]; + List> expectedSortDatanodesInvocations = new ArrayList<>(); + Map> expectedSortedNodes = new HashMap<>(); + int ki = 0; + for (int p = 0; p < pipelineCount; p++) { + final Pipeline pipeline = MockPipeline.createPipeline(3); + final List nodes = pipeline.getNodes().stream() + .map(DatanodeDetails::getUuidString) + .collect(toList()); + expectedSortDatanodesInvocations.add(nodes); + final List sortedNodes = pipeline.getNodes().stream() + .sorted(comparing(DatanodeDetails::getUuidString)) + .collect(toList()); + expectedSortedNodes.put(pipeline, sortedNodes); + + when(blockClient.sortDatanodes(nodes, client)) + .thenReturn(sortedNodes); + + for (int i = 1; i <= keysPerPipeline; i++) { + OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder() + .setBlockID(new BlockID(i, 1L)) + .setPipeline(pipeline) + .setOffset(0) + .setLength(256000) + .build(); + + OmKeyInfo keyInfo = new OmKeyInfo.Builder() + .setOmKeyLocationInfos(Arrays.asList( + new OmKeyLocationInfoGroup(0, emptyList()), + new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo)))) + .build(); + keyInfos[ki++] = keyInfo; + } + } + + // WHEN + keyManager.sortDatanodes(client, keyInfos); + + // THEN + // verify all key info locations got updated + for (OmKeyInfo keyInfo : keyInfos) { + OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); + Assert.assertNotNull(locations); + for (OmKeyLocationInfo locationInfo : locations.getLocationList()) { + Pipeline pipeline = locationInfo.getPipeline(); + List expectedOrder = expectedSortedNodes.get(pipeline); + Assert.assertEquals(expectedOrder, pipeline.getNodesInOrder()); + } + } + + // expect one invocation per pipeline + for (List nodes : expectedSortDatanodesInvocations) { + verify(blockClient).sortDatanodes(nodes, client); + } } }