From e9b5050d5ae8d49c0da2c17f3babeed2a813ffb1 Mon Sep 17 00:00:00 2001 From: yagagagaga Date: Wed, 13 Nov 2024 19:27:40 +0800 Subject: [PATCH] [enhancement](cloud) Prohibit changing deployment mode (#40764) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes At present, the version of separation of storage and computation version and the version of computational storage cannot be converted to each other. But if the user insists on mixing the two, there is no way to avoid it at the code level. The following are possible scenarios that may occur: Case | The node has been in Cloud cluster before | The node has been in Local cluster before | The node never been in any cluster -- | -- | -- | -- add BE to local cluster | Add successfully, but error `invalid cluster id. ignore. ` will be occurred. No negative impact on the original two clusters. | Add successfully, but error `invalid cluster id. ignore. ` will be occurred. No negative impact on the original two clusters. | If cloud configuration is not added, it can work normally
If cloud configuration has been added, it will resulting in the inability to start normally add FE to local cluster | Add successfully, but error `Socket is closed by peer. ` will be occurred. No negative impact on the original two clusters. | Add successfully, but error `Socket is closed by peer. ` will be occurred. No negative impact on the original two clusters. | If cloud configuration is not added, it can work normally
If cloud configuration has been added, it will resulting in the inability to start normally add BE to cloud cluster | Add successfully, but error `invalid cluster id. ignore. ` will be occurred. No negative impact on the original two clusters. | Add successfully, but error `invalid cluster id. ignore. ` will be occurred. No negative impact on the original two clusters. | If cloud configuration is not added, BE can run successfully, but error will occur when execute inserting.
If cloud configuration has been added, it can work normally add FE to cloud cluster | Add successfully, but error `Socket is closed by peer. ` will be occurred. No negative impact on the original two clusters. | Add successfully, but error `Socket is closed by peer. ` will be occurred. No negative impact on the original two clusters. | If cloud configuration is not added, FE will be hang and error `Unknown meta module: cloudWarmUpJob.`
If cloud configuration has been added, it can work normally ---- | Case | Situation | | --------------------------------------------- | ------------------------------------------------------------ | | BE in Local cluster add cloud config items | Hang up | | FE in Local cluster add cloud config items | Hang up | | BE in Cloud cluster remove cloud config items | run successfully, but error occur when do query or insert | | FE in Cloud cluster remove cloud config items | service down | In this PR, I will check Doris' deployment mode. If the deployment mode is modified later, the service will be down and a clear error message will be given. ---- ## 拟议变更 目前存算分离和存算一体模式不能互相转换,大部分情况下,这两种模式的部署应该不会搞混,但也不排除有些用户稀里糊涂,添加错了。另一个就是用户可能误删cloud相关的配置(比如从其他地方拷贝配置覆盖当前配置),导致以local模式启动。 针对不同集群的不同节点的情况: | 情况 | 此节点之前已在其他Cloud集群 | 此节点之前已在其他Local集群 | 此节点之前从未添加到任何集群 | | :-------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | | 把BE添加到Local的集群 | 可以添加,但心跳的时候会报invalid cluster id. ignore. 不影响原来两个集群的正常使用 | 可以添加,但心跳的时候会报invalid cluster id. ignore. 不影响原来两个集群的正常使用 | 如果未加cloud相关配置信息,能正常工作如果已加cloud相关配置信息,会以cloud的逻辑启动,导致不能正常启动 | | 把FE添加到Local的集群 | 可以添加,但心跳的时候会报 Socket is closed by peer. 不影响原来两个FE的正常使用 | 可以添加,但心跳的时候会报 Socket is closed by peer. 不影响原来两个FE的正常使用 | 如果未加cloud相关配置信息,能正常工作如果已加cloud相关配置信息,会以cloud的逻辑启动,导致不能正常启动 | | 把BE添加到Cloud的集群 | 可以添加,但心跳的时候会报invalid cluster id. ignore. 不影响原来两个集群的正常使用 | 可以添加,但心跳的时候会报invalid cluster id. ignore. 不影响原来两个集群的正常使用 | 如果未加cloud相关配置信息,能添加成功,但比如insert会报错,甚至会导致原有正常的be core如果已加cloud相关配置信息,能正常工作 | | 把FE添加到Cloud的集群 | 可以添加,但心跳的时候会报 Socket is closed by peer. 不影响原来两个FE的正常使用 | 可以添加,但心跳的时候会报 Socket is closed by peer. 不影响原来两个FE的正常使用 | 如果未加cloud相关配置信息如果没加入cloud集群,会报failed to get local fe's type, sleep 5 s, try again.如果已加入cloud集群,读取元数据会报错Unknown meta module: cloudWarmUpJob.,卡住如果已加cloud相关配置信息,能正常工作 | ---- | 情况 | 现象 | | :--------------------------- | :--------------------------------------------------- | | Local集群的BE添加cloud的配置 | 会以cloud的逻辑启动,导致启动卡住 | | Local集群的FE添加cloud的配置 | 会以cloud的逻辑启动,导致启动卡住 | | Cloud集群的BE删除cloud的配置 | 能正常启动,但查询导入会报错 | | Cloud集群的FE删除cloud的配置 | 不断刷get version from meta service failed,然后挂掉 | 针对这些情况,节点切换cloud/local模式的,应该快速失败,然后告知用户 --------- Co-authored-by: yagagagaga --- be/src/olap/olap_define.h | 1 + be/src/runtime/exec_env.h | 1 + be/src/runtime/exec_env_init.cpp | 45 +++++++++++++++++++ .../main/java/org/apache/doris/DorisFE.java | 4 +- .../java/org/apache/doris/catalog/Env.java | 31 +++++++++++++ .../org/apache/doris/persist/Storage.java | 31 +++++++++++++ .../doris/service/FrontendServiceImpl.java | 13 ++++++ .../org/apache/doris/system/HeartbeatMgr.java | 1 + gensrc/thrift/FrontendService.thrift | 1 + 9 files changed, 127 insertions(+), 1 deletion(-) diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 0e6d0155d60096..5131c51ca01c20 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -87,6 +87,7 @@ enum OLAPDataVersion { // Different types of folder names under storage_root_path static const std::string MINI_PREFIX = "mini_download"; static const std::string CLUSTER_ID_PREFIX = "cluster_id"; +static const std::string DEPLOY_MODE_PREFIX = "deploy_mode"; static const std::string DATA_PREFIX = "data"; static const std::string DPP_PREFIX = "dpp_download"; static const std::string SNAPSHOT_PREFIX = "snapshot"; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4a0000fa19fdca..031595a9c41b1d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -344,6 +344,7 @@ class ExecEnv { void _destroy(); Status _init_mem_env(); + Status _check_deploy_mode(); void _register_metrics(); void _deregister_metrics(); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 9d76178661169a..51714c7deb2053 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -346,6 +346,8 @@ Status ExecEnv::_init(const std::vector& store_paths, options.store_paths = store_paths; options.broken_paths = broken_paths; options.backend_uid = doris::UniqueId::gen_uid(); + // Check if the startup mode has been modified + RETURN_IF_ERROR(_check_deploy_mode()); if (config::is_cloud_mode()) { std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id << ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl; @@ -622,6 +624,49 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); } +Status ExecEnv::_check_deploy_mode() { + for (auto _path : _store_paths) { + auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX); + std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local"; + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists)); + if (exists) { + // check if is ok + io::FileReaderSPtr reader; + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader)); + size_t fsize = reader->size(); + if (fsize > 0) { + std::string actual_mode; + actual_mode.resize(fsize, '\0'); + size_t bytes_read = 0; + RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read)); + DCHECK_EQ(fsize, bytes_read); + if (expected_mode != actual_mode) { + return Status::InternalError( + "You can't switch deploy mode from {} to {}, " + "maybe you need to check be.conf\n", + actual_mode.c_str(), expected_mode.c_str()); + } + LOG(INFO) << "The current deployment mode is " << expected_mode << "."; + } + } else { + io::FileWriterPtr file_writer; + RETURN_IF_ERROR( + io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer)); + RETURN_IF_ERROR(file_writer->append(expected_mode)); + RETURN_IF_ERROR(file_writer->close()); + LOG(INFO) << "The file deploy_mode doesn't exist, create it."; + auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX); + RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); + if (exists) { + LOG(WARNING) << "This may be an upgrade from old version," + << "or the deploy_mode file has been manually deleted"; + } + } + } + return Status::OK(); +} + void ExecEnv::_register_metrics() { REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() { return _send_batch_thread_pool->num_threads(); }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index d028f3aeae1437..d5b7cd7354eb1f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -231,7 +231,9 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star // Some exception may thrown before LOG is inited. // So need to print to stdout e.printStackTrace(); - LOG.warn("", e); + LOG.error("", e); + // to avoid nonDaemon Thread block main Thread, we need to force exit + System.exit(-1); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e48b514cdd28cf..6d0a68cd0bb59f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1080,6 +1080,7 @@ public void initialize(String[] args) throws Exception { // 2. get cluster id and role (Observer or Follower) if (!Config.enable_check_compatibility_mode) { + checkDeployMode(); getClusterIdAndRole(); } else { isElectable = true; @@ -1367,6 +1368,31 @@ protected void getClusterIdAndRole() throws IOException { clusterId, isElectable, role.name(), nodeName); } + /** + * write cloud/local to MODE_FILE. + */ + protected void checkDeployMode() throws IOException { + File modeFile = new File(this.imageDir, Storage.DEPLOY_MODE_FILE); + Storage storage = new Storage(this.imageDir); + String expectedMode = getDeployMode(); + if (modeFile.exists()) { + String actualMode = storage.getDeployMode(); + Preconditions.checkArgument(expectedMode.equals(actualMode), + "You can't switch deploy mode from %s to %s, maybe you need to check fe.conf", + actualMode, expectedMode); + LOG.info("The current deployment mode is " + expectedMode + "."); + } else { + storage.setDeployMode(expectedMode); + storage.writeClusterMode(); + LOG.info("The file DEPLOY_MODE doesn't exist, create it."); + File versionFile = new File(this.imageDir, Storage.VERSION_FILE); + if (versionFile.exists()) { + LOG.warn("This may be an upgrade from old version, " + + "or the DEPLOY_MODE file has been manually deleted"); + } + } + } + public static String genFeNodeName(String host, int port, boolean isOldStyle) { if (isOldStyle) { return host + "_" + port; @@ -4198,6 +4224,10 @@ public int getClusterId() { return this.clusterId; } + public String getDeployMode() { + return Config.isCloudMode() ? Storage.CLOUD_MODE : Storage.LOCAL_MODE; + } + public String getToken() { return token; } @@ -6705,3 +6735,4 @@ private void replayJournalsAndExit() { System.exit(0); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java index 910a42ab7610b8..9f8cd558a57cc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java @@ -55,11 +55,16 @@ public class Storage { public static final String IMAGE_NEW = "image.ckpt"; public static final String VERSION_FILE = "VERSION"; public static final String ROLE_FILE = "ROLE"; + public static final String DEPLOY_MODE_FILE = "DEPLOY_MODE"; + public static final String DEPLOY_MODE = "deploy_mode"; + public static final String CLOUD_MODE = "cloud"; + public static final String LOCAL_MODE = "local"; private int clusterID = 0; private String token; private FrontendNodeType role = FrontendNodeType.UNKNOWN; private String nodeName; + private String deployMode; private long editsSeq; private long latestImageSeq = 0; private long latestValidatedImageSeq = 0; @@ -116,6 +121,14 @@ public void reload() throws IOException { nodeName = prop.getProperty(NODE_NAME, null); } + File modeFile = getModeFile(); + if (modeFile.isFile()) { + try (FileInputStream in = new FileInputStream(modeFile)) { + prop.load(in); + } + deployMode = prop.getProperty(DEPLOY_MODE); + } + // Find the latest two images File dir = new File(metaDir); File[] children = dir.listFiles(); @@ -165,6 +178,14 @@ public FrontendNodeType getRole() { return role; } + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + public String getNodeName() { return nodeName; } @@ -224,6 +245,12 @@ public void writeFrontendRoleAndNodeName(FrontendNodeType role, String nameNode) writePropertiesToFile(properties, ROLE_FILE); } + public void writeClusterMode() throws IOException { + Properties properties = new Properties(); + properties.setProperty(DEPLOY_MODE, deployMode); + writePropertiesToFile(properties, DEPLOY_MODE_FILE); + } + private void writePropertiesToFile(Properties properties, String fileName) throws IOException { RandomAccessFile file = new RandomAccessFile(new File(metaDir, fileName), "rws"); FileOutputStream out = null; @@ -287,6 +314,10 @@ public final File getRoleFile() { return new File(metaDir, ROLE_FILE); } + public final File getModeFile() { + return new File(metaDir, DEPLOY_MODE_FILE); + } + public File getCurrentEditsFile() { return new File(metaDir, EDITS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8b57732dadc1b4..2e7980d80d4337 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2263,6 +2263,19 @@ public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) th result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId()); } + if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { + // If the version of FE is too old, we need to ensure compatibility. + if (request.getDeployMode() == null) { + LOG.warn("Couldn't find deployMode in heartbeat info, " + + "maybe you need upgrade FE master."); + } else if (!request.getDeployMode().equals(Env.getCurrentEnv().getDeployMode())) { + result.setStatus(TFrontendPingFrontendStatusCode.FAILED); + result.setMsg("expected deployMode: " + + request.getDeployMode() + + ", but found deployMode: " + + Env.getCurrentEnv().getDeployMode()); + } + } if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { if (!request.getToken().equals(Env.getCurrentEnv().getToken())) { result.setStatus(TFrontendPingFrontendStatusCode.FAILED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index d7eff484c6a967..f8e75633a0d6ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -376,6 +376,7 @@ private HeartbeatResponse getHeartbeatResponse() { try { client = ClientPool.frontendHeartbeatPool.borrowObject(addr); TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token); + request.setDeployMode(Env.getCurrentEnv().getDeployMode()); TFrontendPingFrontendResult result = client.ping(request); ok = true; if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f45e64c310377c..181b632e43f819 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -949,6 +949,7 @@ enum TFrontendPingFrontendStatusCode { struct TFrontendPingFrontendRequest { 1: required i32 clusterId 2: required string token + 3: optional string deployMode } struct TDiskInfo {