Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.utils.BackgroundService;
Expand Down Expand Up @@ -109,6 +111,7 @@
import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
Expand Down Expand Up @@ -699,7 +702,7 @@ public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
refreshPipeline(value);

if (args.getSortDatanodes()) {
sortDatanodeInPipeline(value, clientAddress);
sortDatanodes(clientAddress, value);
}
return value;
}
Expand Down Expand Up @@ -1827,7 +1830,7 @@ private OzoneFileStatus getOzoneFileStatus(String volumeName,
// Please refer this jira for more details.
refreshPipeline(fileKeyInfo);
if (sortDatanodes) {
sortDatanodeInPipeline(fileKeyInfo, clientAddress);
sortDatanodes(clientAddress, fileKeyInfo);
}
return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false);
}
Expand Down Expand Up @@ -2212,9 +2215,7 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
refreshPipeline(keyInfoList);

if (args.getSortDatanodes()) {
for (OzoneFileStatus fileStatus : fileStatusList) {
sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
}
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
}

return fileStatusList;
Expand Down Expand Up @@ -2309,38 +2310,67 @@ private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo)
return encInfo;
}

private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) {
if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) {
for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
key.getLocationList().forEach(k -> {
List<DatanodeDetails> nodes = k.getPipeline().getNodes();
if (nodes == null || nodes.isEmpty()) {
LOG.warn("Datanodes for pipeline {} is empty",
k.getPipeline().getId().toString());
return;
}
List<String> nodeList = new ArrayList<>();
nodes.stream().forEach(node ->
nodeList.add(node.getUuidString()));
try {
List<DatanodeDetails> sortedNodes = scmClient.getBlockClient()
.sortDatanodes(nodeList, clientMachine);
k.getPipeline().setNodesInOrder(sortedNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("Sort datanodes {} for client {}, return {}", nodes,
clientMachine, sortedNodes);
@VisibleForTesting
void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
if (keyInfos != null && clientMachine != null && !clientMachine.isEmpty()) {
Map<Set<String>, List<DatanodeDetails>> sortedPipelines = new HashMap<>();
for (OmKeyInfo keyInfo : keyInfos) {
OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
if (key == null) {
LOG.warn("No location for key {}", keyInfo);
continue;
}
for (OmKeyLocationInfo k : key.getLocationList()) {
Pipeline pipeline = k.getPipeline();
List<DatanodeDetails> nodes = pipeline.getNodes();
List<String> uuidList = toNodeUuid(nodes);
Set<String> uuidSet = new HashSet<>(uuidList);
List<DatanodeDetails> sortedNodes = sortedPipelines.get(uuidSet);
if (sortedNodes == null) {
if (nodes.isEmpty()) {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}
} catch (IOException e) {
LOG.warn("Unable to sort datanodes based on distance to " +
"client, volume=" + keyInfo.getVolumeName() +
", bucket=" + keyInfo.getBucketName() +
", key=" + keyInfo.getKeyName() +
", client=" + clientMachine +
", datanodes=" + nodes.toString() +
", exception=" + e.getMessage());
sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo,
uuidList);
if (sortedNodes != null) {
sortedPipelines.put(uuidSet, sortedNodes);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("Found sorted datanodes for pipeline {} and client {} "
+ "in cache", pipeline.getId(), clientMachine);
}
});
pipeline.setNodesInOrder(sortedNodes);
}
}
}
}

private List<DatanodeDetails> sortDatanodes(String clientMachine,
List<DatanodeDetails> nodes, OmKeyInfo keyInfo, List<String> nodeList) {
List<DatanodeDetails> sortedNodes = null;
try {
sortedNodes = scmClient.getBlockClient()
.sortDatanodes(nodeList, clientMachine);
if (LOG.isDebugEnabled()) {
LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes,
clientMachine, sortedNodes);
}
} catch (IOException e) {
LOG.warn("Unable to sort datanodes based on distance to client, "
+ " volume={}, bucket={}, key={}, client={}, datanodes={}, "
+ " exception={}",
keyInfo.getVolumeName(), keyInfo.getBucketName(),
keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage());
}
return sortedNodes;
}

private static List<String> toNodeUuid(Collection<DatanodeDetails> nodes) {
List<String> nodeSet = new ArrayList<>(nodes.size());
for (DatanodeDetails node : nodes) {
nodeSet.add(node.getUuidString());
}
return nodeSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@
import org.junit.Test;
import org.mockito.Mockito;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Unit test key manager.
Expand All @@ -91,6 +95,7 @@ public class TestKeyManagerUnit {

private Instant startDate;
private File testDir;
private ScmBlockLocationProtocol blockClient;

@Before
public void setup() throws IOException {
Expand All @@ -100,14 +105,10 @@ public void setup() throws IOException {
testDir.toString());
metadataManager = new OmMetadataManagerImpl(configuration);
containerClient = Mockito.mock(StorageContainerLocationProtocol.class);
blockClient = Mockito.mock(ScmBlockLocationProtocol.class);
keyManager = new KeyManagerImpl(
Mockito.mock(ScmBlockLocationProtocol.class),
containerClient,
metadataManager,
configuration,
"omtest",
Mockito.mock(OzoneBlockTokenSecretManager.class)
);
blockClient, containerClient, metadataManager, configuration,
"omtest", Mockito.mock(OzoneBlockTokenSecretManager.class));

startDate = Instant.now();
}
Expand Down Expand Up @@ -372,10 +373,10 @@ public void testLookupFileWithDnFailure() throws IOException {

List<ContainerWithPipeline> cps = new ArrayList<>();
ContainerInfo ci = Mockito.mock(ContainerInfo.class);
Mockito.when(ci.getContainerID()).thenReturn(1L);
when(ci.getContainerID()).thenReturn(1L);
cps.add(new ContainerWithPipeline(ci, pipelineTwo));

Mockito.when(containerClient.getContainerWithPipelineBatch(containerIDs))
when(containerClient.getContainerWithPipelineBatch(containerIDs))
.thenReturn(cps);

final OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
Expand Down Expand Up @@ -443,24 +444,20 @@ public void listStatus() throws Exception {
String volume = "vol";
String bucket = "bucket";
String keyPrefix = "key";
String client = "client.host";

TestOMRequestUtils.addVolumeToDB(volume, OzoneConsts.OZONE,
metadataManager);

TestOMRequestUtils.addBucketToDB(volume, bucket, metadataManager);

final Pipeline pipeline = MockPipeline.createPipeline(3);

OmKeyInfo.Builder keyInfoBuilder = new OmKeyInfo.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setCreationTime(Time.now())
.setOmKeyLocationInfos(singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>())))
.setReplicationFactor(ReplicationFactor.THREE)
.setReplicationType(ReplicationType.RATIS);
final List<String> nodes = pipeline.getNodes().stream()
.map(DatanodeDetails::getUuidString)
.collect(toList());

List<Long> containerIDs = new ArrayList<>();
List<ContainerWithPipeline> containersWithPipeline = new ArrayList<>();
for (long i = 1; i <= 10; i++) {
final OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(i, 1L))
Expand All @@ -469,9 +466,21 @@ public void listStatus() throws Exception {
.setLength(256000)
.build();

ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerID(i)
.build();
containersWithPipeline.add(
new ContainerWithPipeline(containerInfo, pipeline));
containerIDs.add(i);

OmKeyInfo keyInfo = keyInfoBuilder
OmKeyInfo keyInfo = new OmKeyInfo.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setCreationTime(Time.now())
.setOmKeyLocationInfos(singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>())))
.setReplicationFactor(ReplicationFactor.THREE)
.setReplicationType(ReplicationType.RATIS)
.setKeyName(keyPrefix + i)
.setObjectID(i)
.setUpdateID(i)
Expand All @@ -480,15 +489,83 @@ public void listStatus() throws Exception {
TestOMRequestUtils.addKeyToOM(metadataManager, keyInfo);
}

when(containerClient.getContainerWithPipelineBatch(containerIDs))
.thenReturn(containersWithPipeline);

OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setKeyName("");
.setKeyName("")
.setSortDatanodesInPipeline(true);
List<OzoneFileStatus> fileStatusList =
keyManager.listStatus(builder.build(), false, null, Long.MAX_VALUE);
keyManager.listStatus(builder.build(), false,
null, Long.MAX_VALUE, client);

Assert.assertEquals(10, fileStatusList.size());
verify(containerClient).getContainerWithPipelineBatch(containerIDs);
verify(blockClient).sortDatanodes(nodes, client);
}

@Test
public void sortDatanodes() throws Exception {
// GIVEN
String client = "anyhost";
int pipelineCount = 3;
int keysPerPipeline = 5;
OmKeyInfo[] keyInfos = new OmKeyInfo[pipelineCount * keysPerPipeline];
List<List<String>> expectedSortDatanodesInvocations = new ArrayList<>();
Map<Pipeline, List<DatanodeDetails>> expectedSortedNodes = new HashMap<>();
int ki = 0;
for (int p = 0; p < pipelineCount; p++) {
final Pipeline pipeline = MockPipeline.createPipeline(3);
final List<String> nodes = pipeline.getNodes().stream()
.map(DatanodeDetails::getUuidString)
.collect(toList());
expectedSortDatanodesInvocations.add(nodes);
final List<DatanodeDetails> sortedNodes = pipeline.getNodes().stream()
.sorted(comparing(DatanodeDetails::getUuidString))
.collect(toList());
expectedSortedNodes.put(pipeline, sortedNodes);

when(blockClient.sortDatanodes(nodes, client))
.thenReturn(sortedNodes);

for (int i = 1; i <= keysPerPipeline; i++) {
OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(i, 1L))
.setPipeline(pipeline)
.setOffset(0)
.setLength(256000)
.build();

OmKeyInfo keyInfo = new OmKeyInfo.Builder()
.setOmKeyLocationInfos(Arrays.asList(
new OmKeyLocationInfoGroup(0, emptyList()),
new OmKeyLocationInfoGroup(1, singletonList(keyLocationInfo))))
.build();
keyInfos[ki++] = keyInfo;
}
}

// WHEN
keyManager.sortDatanodes(client, keyInfos);

// THEN
// verify all key info locations got updated
for (OmKeyInfo keyInfo : keyInfos) {
OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
Assert.assertNotNull(locations);
for (OmKeyLocationInfo locationInfo : locations.getLocationList()) {
Pipeline pipeline = locationInfo.getPipeline();
List<DatanodeDetails> expectedOrder = expectedSortedNodes.get(pipeline);
Assert.assertEquals(expectedOrder, pipeline.getNodesInOrder());
}
}

// expect one invocation per pipeline
for (List<String> nodes : expectedSortDatanodesInvocations) {
verify(blockClient).sortDatanodes(nodes, client);
}
}

}