ozone.scm.ha.ratis.server.snapshot.creation.gap
1024
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmBlockLocationClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmBlockLocationClient.java
new file mode 100644
index 000000000000..5196841db4d6
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmBlockLocationClient.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION_DEFAULT;
+
+/**
+ * This client implements a background thread which periodically checks and
+ * gets the latest network topology schema file from SCM.
+ */
+public class ScmBlockLocationClient {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ScmBlockLocationClient.class);
+
+ private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+ private final AtomicReference cache = new AtomicReference<>();
+ private ScheduledExecutorService executorService;
+
+ public ScmBlockLocationClient(
+ ScmBlockLocationProtocol scmBlockLocationProtocol) {
+ this.scmBlockLocationProtocol = scmBlockLocationProtocol;
+ }
+
+ public String getTopologyInformation() {
+ return requireNonNull(cache.get(),
+ "ScmBlockLocationClient must have been initialized already.");
+ }
+
+ public void refetchTopologyInformation() {
+ checkAndRefresh(Duration.ZERO, Instant.now());
+ }
+
+ public void start(ConfigurationSource conf) throws IOException {
+ final String initialTopology =
+ scmBlockLocationProtocol.getTopologyInformation();
+ LOG.info("Initial topology information fetched from SCM: {}.",
+ initialTopology);
+ cache.set(initialTopology);
+ scheduleTopologyPoller(conf, Instant.now());
+ }
+
+ public void stop() {
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while shutting down executor service.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void scheduleTopologyPoller(ConfigurationSource conf,
+ Instant initialInvocation) {
+ Duration refreshDuration = parseRefreshDuration(conf);
+ Instant nextRefresh = initialInvocation.plus(refreshDuration);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("NetworkTopologyPoller")
+ .setDaemon(true)
+ .build();
+ executorService = Executors.newScheduledThreadPool(1, threadFactory);
+ Duration interval = parseRefreshCheckDuration(conf);
+ Duration initialDelay = Duration.between(Instant.now(), nextRefresh);
+
+ LOG.info("Scheduling NetworkTopologyPoller with initial delay of {} " +
+ "and interval of {}", initialDelay, interval);
+ executorService.scheduleAtFixedRate(
+ () -> checkAndRefresh(refreshDuration, initialInvocation),
+ initialDelay.toMillis(), interval.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public static Duration parseRefreshDuration(ConfigurationSource conf) {
+ long refreshDurationInMs = conf.getTimeDuration(
+ OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION,
+ OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ return Duration.ofMillis(refreshDurationInMs);
+ }
+
+ public static Duration parseRefreshCheckDuration(ConfigurationSource conf) {
+ long refreshCheckInMs = conf.getTimeDuration(
+ OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION,
+ OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ return Duration.ofMillis(refreshCheckInMs);
+ }
+
+ private synchronized void checkAndRefresh(Duration refreshDuration,
+ Instant initialInvocation) {
+ String current = cache.get();
+ Instant nextRefresh = initialInvocation.plus(refreshDuration);
+ if (nextRefresh.isBefore(Instant.now())) {
+ try {
+ String newTopology = scmBlockLocationProtocol.getTopologyInformation();
+ if (!newTopology.equals(current)) {
+ cache.set(newTopology);
+ LOG.info("Updated network topology schema file fetched from " +
+ "SCM: {}.", newTopology);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Error fetching updated network topology schema file from SCM", e);
+ }
+ }
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
new file mode 100644
index 000000000000..8dc9cb3cca2f
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Freon related helper classes used for load testing.
+ */
+
+/**
+ * Contains SCM client related classes.
+ */
+package org.apache.hadoop.hdds.scm.client;
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index f9515987bf00..d01201302e53 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -114,4 +114,6 @@ List allocateBlock(long size, int numBlocks,
*/
List sortDatanodes(List nodes,
String clientMachine) throws IOException;
+
+ String getTopologyInformation() throws IOException;
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index eb19e46b7ac1..5cd521fca646 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -38,6 +38,8 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
@@ -320,6 +322,23 @@ public List sortDatanodes(List nodes,
return results;
}
+ @Override
+ public String getTopologyInformation() throws IOException {
+ GetTopologyInformationRequestProto request =
+ GetTopologyInformationRequestProto.newBuilder().build();
+ SCMBlockLocationRequest wrapper =
+ createSCMBlockRequest(Type.GetTopologyInformation)
+ .setGetTopologyInformationRequest(request)
+ .build();
+
+ final SCMBlockLocationResponse wrappedResponse =
+ handleError(submitRequest(wrapper));
+ GetTopologyInformationResponseProto resp =
+ wrappedResponse.getGetTopologyInformationResponse();
+
+ return resp.getSchemaFile();
+ }
+
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 37f1d4adbf43..32c58b133040 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -39,6 +39,7 @@ enum Type {
GetScmInfo = 13;
SortDatanodes = 14;
AddScm = 15;
+ GetTopologyInformation = 16;
}
message SCMBlockLocationRequest {
@@ -56,6 +57,7 @@ message SCMBlockLocationRequest {
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
optional SortDatanodesRequestProto sortDatanodesRequest = 14;
optional hadoop.hdds.AddScmRequestProto addScmRequestProto = 15;
+ optional GetTopologyInformationRequestProto getTopologyInformationRequest = 16;
}
message SCMBlockLocationResponse {
@@ -80,6 +82,7 @@ message SCMBlockLocationResponse {
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
optional SortDatanodesResponseProto sortDatanodesResponse = 14;
optional hadoop.hdds.AddScmResponseProto addScmResponse = 15;
+ optional GetTopologyInformationResponseProto getTopologyInformationResponse = 16;
}
/**
@@ -226,6 +229,13 @@ message SortDatanodesResponseProto{
repeated DatanodeDetailsProto node = 1;
}
+message GetTopologyInformationRequestProto {
+}
+
+message GetTopologyInformationResponseProto {
+ required string schemaFile = 1;
+}
+
/**
* Protocol used from OzoneManager to StorageContainerManager.
* See request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 4ed5233840b1..b3db675180eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
@@ -159,6 +160,9 @@ private SCMBlockLocationResponse processMessage(
request.getSortDatanodesRequest(), request.getVersion()
));
break;
+ case GetTopologyInformation:
+ response.setGetTopologyInformationResponse(getTopologyInformation());
+ break;
default:
// Should never happen
throw new IOException("Unknown Operation " + request.getCmdType() +
@@ -275,4 +279,13 @@ public SortDatanodesResponseProto sortDatanodes(
throw new ServiceException(ex);
}
}
+
+ public GetTopologyInformationResponseProto getTopologyInformation()
+ throws IOException {
+ GetTopologyInformationResponseProto.Builder resp =
+ GetTopologyInformationResponseProto.newBuilder();
+ String schemaFile = impl.getTopologyInformation();
+ resp.setSchemaFile(schemaFile);
+ return resp.build();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index d10c3824bb8d..602b1b531862 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -335,6 +336,14 @@ public boolean addSCM(AddSCMRequest request) throws IOException {
}
}
+ @Override
+ public String getTopologyInformation() {
+ String schemaFile = conf.get(
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
+ return schemaFile;
+ }
+
@Override
public List sortDatanodes(List nodes,
String clientMachine) {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 8c39228bb35d..3bcd4a0a1a88 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -259,6 +259,7 @@ public static boolean isReadOnly(
case TenantListUser:
case ListSnapshot:
case RefetchSecretKey:
+ case RefetchTopologyInformation:
case RangerBGSync:
// RangerBGSync is a read operation in the sense that it doesn't directly
// write to OM DB. And therefore it doesn't need a OMClientRequest.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index eddba4f96f48..ec650ca52558 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -1072,6 +1072,14 @@ void setTimes(OmKeyArgs keyArgs, long mtime, long atime)
throws IOException;
UUID refetchSecretKey() throws IOException;
+
+ /**
+ * Refetches the updated network topology schema file from SCM.
+ *
+ * @return the refetched network topology schema file from SCM.
+ * @throws IOException
+ */
+ String refetchTopologyInformation() throws IOException;
/**
* Enter, leave, or get safe mode.
*
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 7133ae03e475..28e939200103 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -166,6 +166,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchSecretKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchSecretKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchTopologyInformationRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchTopologyInformationResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeysArgs;
@@ -1528,6 +1530,19 @@ public UUID refetchSecretKey() throws IOException {
return ProtobufUtils.fromProtobuf(resp.getId());
}
+ @Override
+ public String refetchTopologyInformation() throws IOException {
+ final RefetchTopologyInformationRequest.Builder requestBuilder =
+ RefetchTopologyInformationRequest.newBuilder();
+ final OMRequest omRequest = createOMRequest(Type.RefetchTopologyInformation)
+ .setRefetchTopologyInformationRequest(requestBuilder)
+ .build();
+ final OMResponse omResponse = submitRequest(omRequest);
+ final RefetchTopologyInformationResponse resp =
+ handleError(omResponse).getRefetchTopologyInformationResponse();
+ return resp.getUpdatedSchema();
+ }
+
/**
* Return the proxy object underlying this protocol translator.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index 6fa6b2ffd28f..5c161dd35fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
@@ -41,6 +42,7 @@
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -291,6 +293,8 @@ public void testDelegationToken() throws Exception {
try {
// Start OM
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(
+ new ScmBlockLocationTestingClient(null, null, 0, conf)));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String username = ugi.getUserName();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetTopologyInformation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetTopologyInformation.java
new file mode 100644
index 000000000000..62a4336780a2
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetTopologyInformation.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.DefaultConfigManager;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.util.UUID;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION;
+
+/**
+ * Test fetching network topology information from SCM.
+ */
+public class TestGetTopologyInformation {
+ private static MiniOzoneCluster cluster = null;
+ private static OzoneConfiguration conf;
+ private static OzoneClient client;
+ private static ObjectStore store;
+ private static OzoneManagerProtocol writeClient;
+ private ClassLoader classLoader =
+ Thread.currentThread().getContextClassLoader();
+
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ String clusterId = UUID.randomUUID().toString();
+ String scmId = UUID.randomUUID().toString();
+ String omId = UUID.randomUUID().toString();
+ conf.set(OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION, "15s");
+ conf.set(OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION, "2s");
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOmId(omId)
+ .setNumOfOzoneManagers(3)
+ .setNumOfStorageContainerManagers(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ client = cluster.newClient();
+ store = client.getObjectStore();
+ writeClient = store.getClientProxy().getOzoneManagerClient();
+ }
+
+ @AfterClass
+ public static void stop() {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ DefaultConfigManager.clearDefaultConfigs();
+ }
+
+ @Test
+ public void testGetTopologyInformation()
+ throws InterruptedException, IOException {
+ String expected =
+ conf.get(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE);
+ String actual = writeClient.refetchTopologyInformation();
+ Assertions.assertEquals(expected, actual);
+
+ testWithUpdatedTopologyInformation();
+ }
+
+ /**
+ * Modify the path to SCM's network topology schema file to test whether OM
+ * refetches the updated file within the specified refresh duration.
+ * @throws InterruptedException
+ */
+ public void testWithUpdatedTopologyInformation()
+ throws InterruptedException, IOException {
+ String filePath =
+ classLoader.getResource("./networkTopologyTestFiles/test-topology.xml")
+ .getPath();
+ conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, filePath);
+ Thread.sleep(30000);
+ String expected =
+ conf.get(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE);
+ String actual = writeClient.refetchTopologyInformation();
+ Assertions.assertEquals(expected, actual);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index e437a0d9e686..996fe73ea315 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -50,10 +50,12 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -86,6 +88,7 @@
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -203,6 +206,7 @@ final class TestSecureOzoneCluster {
private File testUserKeytab;
private String testUserPrincipal;
private StorageContainerManager scm;
+ private ScmBlockLocationProtocol scmBlockClient;
private OzoneManager om;
private HddsProtos.OzoneManagerDetailsProto omInfo;
private String host;
@@ -259,6 +263,7 @@ public void init() {
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
+ scmBlockClient = new ScmBlockLocationTestingClient(null, null, 0, conf);
startMiniKdc();
setSecureConfig();
@@ -554,6 +559,7 @@ public void testAccessControlExceptionOnClient() throws Exception {
setupOm(conf);
try {
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.start();
} catch (Exception ex) {
// Expects timeout failure from scmClient in om but om user login via
@@ -622,6 +628,7 @@ public void testDelegationTokenRenewal() throws Exception {
try {
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -702,6 +709,7 @@ public void testGetSetRevokeS3Secret() throws Exception {
try {
// Start OM
om.setCertClient(new CertificateClientTestImpl(conf));
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.start();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String username = ugi.getUserName();
@@ -1128,6 +1136,7 @@ public void testCertificateRotationUnRecoverableFailure() throws Exception {
// create Ozone Manager instance, it will start the monitor task
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
om = OzoneManager.createOm(conf);
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.setCertClient(mockClient);
// check error message during renew
@@ -1162,6 +1171,7 @@ public void testDelegationTokenRenewCrossCertificateRenew() throws Exception {
String omCertId1 = omCert.getSerialNumber().toString();
// Start OM
om.setCertClient(certClient);
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.start();
GenericTestUtils.waitFor(() -> om.isLeaderReady(), 100, 10000);
@@ -1266,6 +1276,7 @@ public void testOMGrpcServerCertificateRenew() throws Exception {
// conflict with SCM procedure.
OzoneManager.setUgi(ugi);
om = OzoneManager.createOm(conf);
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(scmBlockClient));
om.start();
CertificateClient omCertClient = om.getCertificateClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
index 5b91c1b1c055..a3c3acc20110 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java
@@ -101,6 +101,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
@@ -161,6 +162,8 @@ public static void setUp() throws Exception {
mockScmBlockLocationProtocol = mock(ScmBlockLocationProtocol.class);
mockScmContainerClient =
Mockito.mock(StorageContainerLocationProtocol.class);
+ when(mockScmBlockLocationProtocol.getTopologyInformation()).thenReturn(
+ conf.get(OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE));
OmTestManagers omTestManagers = new OmTestManagers(conf,
mockScmBlockLocationProtocol, mockScmContainerClient);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
index 12f16409d82d..a0ce68710b9e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumesSecure.java
@@ -45,6 +45,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.ozone.OzoneAcl;
@@ -209,6 +210,8 @@ private void setupEnvironment(boolean aclEnabled,
OzoneManager.setTestSecureOmFlag(true);
om = OzoneManager.createOm(conf);
+ om.setScmBlockLocationClient(new ScmBlockLocationClient(
+ new ScmBlockLocationTestingClient(null, null, 0, conf)));
om.setCertClient(new CertificateClientTestImpl(conf));
om.start();
diff --git a/hadoop-ozone/integration-test/src/test/resources/networkTopologyTestFiles/test-topology.xml b/hadoop-ozone/integration-test/src/test/resources/networkTopologyTestFiles/test-topology.xml
new file mode 100644
index 000000000000..25be9c2c5d70
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/resources/networkTopologyTestFiles/test-topology.xml
@@ -0,0 +1,49 @@
+
+
+
+ 1
+
+
+
+ 1
+ Root
+
+
+ rack
+ 1
+ InnerNode
+ /default-rack
+
+
+ nodegroup
+ 1
+ InnerNode
+ /default-nodegroup
+
+
+
+ 0
+ Leaf
+
+
+
+ /datacenter/rack/nodegroup/node
+ true
+
+
\ No newline at end of file
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index fb3c6925fc02..3b1efdad739a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -144,6 +144,7 @@ enum Type {
ListKeysLight = 126;
AbortExpiredMultiPartUploads = 127;
SetSnapshotProperty = 128;
+ RefetchTopologyInformation = 129;
}
enum SafeMode {
@@ -278,6 +279,7 @@ message OMRequest {
optional MultipartUploadsExpiredAbortRequest multipartUploadsExpiredAbortRequest = 126;
optional SetSnapshotPropertyRequest SetSnapshotPropertyRequest = 127;
+ optional RefetchTopologyInformationRequest RefetchTopologyInformationRequest = 128;
}
message OMResponse {
@@ -397,6 +399,7 @@ message OMResponse {
optional ListKeysLightResponse listKeysLightResponse = 126;
optional MultipartUploadsExpiredAbortResponse multipartUploadsExpiredAbortResponse = 127;
optional SetSnapshotPropertyResponse SetSnapshotPropertyResponse = 128;
+ optional RefetchTopologyInformationResponse RefetchTopologyInformationResponse = 129;
}
enum Status {
@@ -636,6 +639,14 @@ message RefetchSecretKeyResponse {
optional hdds.UUID id = 1;
}
+message RefetchTopologyInformationRequest {
+
+}
+
+message RefetchTopologyInformationResponse {
+ optional string updatedSchema = 1;
+}
+
/**
* Checks if the user has specified permissions for the volume
*/
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a23088211231..6c86f609fc42 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -1802,8 +1803,7 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
LOG.warn("No datanodes in pipeline {}", pipeline.getId());
continue;
}
- sortedNodes = sortDatanodes(clientMachine, nodes, keyInfo,
- uuidList);
+ sortedNodes = sortDatanodes(clientMachine, nodes);
if (sortedNodes != null) {
sortedPipelines.put(uuidSet, sortedNodes);
}
@@ -1817,24 +1817,30 @@ void sortDatanodes(String clientMachine, OmKeyInfo... keyInfos) {
}
}
- private List sortDatanodes(String clientMachine,
- List nodes, OmKeyInfo keyInfo, List nodeList) {
- List sortedNodes = null;
- try {
- sortedNodes = scmClient.getBlockClient()
- .sortDatanodes(nodeList, clientMachine);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sorted datanodes {} for client {}, result: {}", nodes,
- clientMachine, sortedNodes);
+ public List sortDatanodes(String clientMachine,
+ List nodes) {
+ DatanodeDetails client = null;
+ List possibleClients =
+ getClientNodesByAddress(clientMachine, nodes);
+ if (possibleClients.size() > 0) {
+ client = possibleClients.get(0);
+ }
+ List extends Node> sortedNodeList = ozoneManager.getClusterMap()
+ .sortByDistanceCost(client, nodes, nodes.size());
+ List ret = new ArrayList<>();
+ sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails) node));
+ return ret;
+ }
+
+ private List getClientNodesByAddress(String clientMachine,
+ List nodes) {
+ List matchingNodes = new ArrayList<>();
+ for (DatanodeDetails node : nodes) {
+ if (node.getIpAddress().equals(clientMachine)) {
+ matchingNodes.add(node);
}
- } catch (IOException e) {
- LOG.warn("Unable to sort datanodes based on distance to client, "
- + " volume={}, bucket={}, key={}, client={}, datanodes={}, "
- + " exception={}",
- keyInfo.getVolumeName(), keyInfo.getBucketName(),
- keyInfo.getKeyName(), clientMachine, nodeList, e.getMessage());
}
- return sortedNodes;
+ return matchingNodes;
}
private static List toNodeUuid(Collection nodes) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a9a13a49a2c5..20f59d02ba8d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -81,7 +81,10 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
@@ -353,6 +356,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneBlockTokenSecretManager blockTokenMgr;
private CertificateClient certClient;
private SecretKeySignerClient secretKeyClient;
+
+ private ScmBlockLocationClient scmBlockLocationClient;
private final Text omRpcAddressTxt;
private OzoneConfiguration configuration;
private RPC.Server omRpcServer;
@@ -385,6 +390,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
+ private NetworkTopology clusterMap;
private ObjectName omInfoBeanName;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
@@ -600,6 +606,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
scmBlockClient = getScmBlockClient(configuration);
+ scmBlockLocationClient = new ScmBlockLocationClient(scmBlockClient);
this.scmClient = new ScmClient(scmBlockClient, scmContainerClient,
configuration);
this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(),
@@ -1085,6 +1092,11 @@ public UUID refetchSecretKey() {
return secretKeyClient.getCurrentSecretKey().getId();
}
+ public String refetchTopologyInformation() {
+ scmBlockLocationClient.refetchTopologyInformation();
+ return scmBlockLocationClient.getTopologyInformation();
+ }
+
@VisibleForTesting
public void startSecretManager() {
try {
@@ -1128,6 +1140,20 @@ public void setCertClient(CertificateClient newClient) throws IOException {
serviceInfo = new ServiceInfoProvider(secConfig, this, certClient);
}
+ /**
+ * For testing purpose only. This allows setting up ScmBlockLocationClient
+ * without having to fully setup a working cluster.
+ */
+ @VisibleForTesting
+ public void setScmBlockLocationClient(
+ ScmBlockLocationClient scmBlockLocationClient) {
+ this.scmBlockLocationClient = scmBlockLocationClient;
+ }
+
+ public NetworkTopology getClusterMap() {
+ return clusterMap;
+ }
+
/**
* For testing purpose only. This allows testing token in integration test
* without fully setting up a working secure cluster.
@@ -1681,6 +1707,16 @@ public void start() throws IOException {
keyManager.start(configuration);
+ try {
+ scmBlockLocationClient.start(configuration);
+ } catch (IOException ex) {
+ LOG.error("Unable to initialize network topology schema file. ", ex);
+ throw new UncheckedIOException(ex);
+ }
+
+ clusterMap = new NetworkTopologyImpl(
+ scmBlockLocationClient.getTopologyInformation());
+
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -2234,6 +2270,11 @@ public void stop() {
}
keyManager.stop();
stopSecretManager();
+
+ if (scmBlockLocationClient != null) {
+ scmBlockLocationClient.stop();
+ }
+
if (httpServer != null) {
httpServer.stop();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 080f48cfcc15..e447ef8fd344 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -91,6 +91,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrintCompactionLogDagRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrintCompactionLogDagResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchSecretKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RefetchTopologyInformationResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVolumeRequest;
@@ -355,6 +356,10 @@ public OMResponse handleReadRequest(OMRequest request) {
responseBuilder
.setPrintCompactionLogDagResponse(printCompactionLogDagResponse);
break;
+ case RefetchTopologyInformation:
+ responseBuilder.setRefetchTopologyInformationResponse(
+ refetchTopologyInformation());
+ break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -1016,6 +1021,14 @@ private RefetchSecretKeyResponse refetchSecretKey() {
return response;
}
+ private RefetchTopologyInformationResponse refetchTopologyInformation() {
+ RefetchTopologyInformationResponse response =
+ RefetchTopologyInformationResponse.newBuilder()
+ .setUpdatedSchema(impl.refetchTopologyInformation())
+ .build();
+ return response;
+ }
+
@RequestFeatureValidator(
conditions = ValidationCondition.OLDER_CLIENT_REQUESTS,
processingPhase = RequestProcessingPhase.POST_PROCESS,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
index 17f575f43b5b..c5bb3f1b71aa 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/OmTestManagers.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.ScmBlockLocationClient;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -87,7 +88,7 @@ public OmTestManagers(OzoneConfiguration conf,
Mockito.mock(StorageContainerLocationProtocol.class);
}
scmBlockClient = blockClient != null ? blockClient :
- new ScmBlockLocationTestingClient(null, null, 0);
+ new ScmBlockLocationTestingClient(null, null, 0, conf);
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
DefaultMetricsSystem.setMiniClusterMode(true);
@@ -102,12 +103,16 @@ public OmTestManagers(OzoneConfiguration conf,
keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
.getInternalState(om, "keyManager");
ScmClient scmClient = new ScmClient(scmBlockClient, containerClient, conf);
+ ScmBlockLocationClient scmBlockLocationClient =
+ new ScmBlockLocationClient(scmBlockClient);
HddsWhiteboxTestUtils.setInternalState(om,
"scmClient", scmClient);
HddsWhiteboxTestUtils.setInternalState(keyManager,
"scmClient", scmClient);
HddsWhiteboxTestUtils.setInternalState(keyManager,
"secretManager", Mockito.mock(OzoneBlockTokenSecretManager.class));
+ HddsWhiteboxTestUtils.setInternalState(om,
+ "scmBlockLocationClient", scmBlockLocationClient);
om.start();
writeClient = OzoneClientFactory.getRpcClient(conf)
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 74d19d547778..54602ff90963 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -24,9 +24,11 @@
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -78,6 +80,7 @@ public class ScmBlockLocationTestingClient implements ScmBlockLocationProtocol {
// The number of blocks deleted by this client
private int numBlocksDeleted = 0;
+ private OzoneConfiguration conf;
/**
* If ClusterID or SCMID is blank a per instance ID is generated.
@@ -88,12 +91,13 @@ public class ScmBlockLocationTestingClient implements ScmBlockLocationProtocol {
* a positive number for that frequency of failure.
*/
public ScmBlockLocationTestingClient(String clusterID, String scmId,
- int failCallsFrequency) {
+ int failCallsFrequency, OzoneConfiguration conf) {
this.clusterID = StringUtils.isNotBlank(clusterID) ? clusterID :
UUID.randomUUID().toString();
this.scmId = StringUtils.isNotBlank(scmId) ? scmId :
UUID.randomUUID().toString();
this.failCallsFrequency = Math.abs(failCallsFrequency);
+ this.conf = conf;
switch (this.failCallsFrequency) {
case 0:
LOG.debug("Set to no failure mode, all delete block calls will " +
@@ -199,6 +203,14 @@ public List sortDatanodes(List nodes,
return null;
}
+ @Override
+ public String getTopologyInformation() {
+ String schemaFile = conf.get(
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
+ return schemaFile;
+ }
+
/**
* Return the number of blocks puesdo deleted by this testing client.
*/
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 0d199eb7f5bd..f86b8c05cf65 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -118,6 +119,9 @@ public void init() throws Exception {
testDir.toString());
containerClient = Mockito.mock(StorageContainerLocationProtocol.class);
blockClient = Mockito.mock(ScmBlockLocationProtocol.class);
+ when(blockClient.getTopologyInformation()).thenReturn(
+ configuration.get(
+ ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE));
OmTestManagers omTestManagers
= new OmTestManagers(configuration, blockClient, containerClient);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 340ce3b6bdf2..6744cdd4001c 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -172,7 +172,7 @@ public void checkIfDeleteServiceWithFailingSCM()
OzoneConfiguration conf = createConfAndInitValues();
ScmBlockLocationProtocol blockClient =
//failCallsFrequency = 1 , means all calls fail.
- new ScmBlockLocationTestingClient(null, null, 1);
+ new ScmBlockLocationTestingClient(null, null, 1, conf);
OmTestManagers omTestManagers
= new OmTestManagers(conf, blockClient, null);
KeyManager keyManager = omTestManagers.getKeyManager();
@@ -217,7 +217,7 @@ public void checkDeletionForEmptyKey()
OzoneConfiguration conf = createConfAndInitValues();
ScmBlockLocationProtocol blockClient =
//failCallsFrequency = 1 , means all calls fail.
- new ScmBlockLocationTestingClient(null, null, 1);
+ new ScmBlockLocationTestingClient(null, null, 1, conf);
OmTestManagers omTestManagers
= new OmTestManagers(conf, blockClient, null);
KeyManager keyManager = omTestManagers.getKeyManager();
@@ -264,7 +264,7 @@ public void checkDeletionForPartiallyCommitKey()
OzoneConfiguration conf = createConfAndInitValues();
ScmBlockLocationProtocol blockClient =
//failCallsFrequency = 1 , means all calls fail.
- new ScmBlockLocationTestingClient(null, null, 1);
+ new ScmBlockLocationTestingClient(null, null, 1, conf);
OmTestManagers omTestManagers
= new OmTestManagers(conf, blockClient, null);
KeyManager keyManager = omTestManagers.getKeyManager();