diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 0e6d0155d60096..5131c51ca01c20 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -87,6 +87,7 @@ enum OLAPDataVersion { // Different types of folder names under storage_root_path static const std::string MINI_PREFIX = "mini_download"; static const std::string CLUSTER_ID_PREFIX = "cluster_id"; +static const std::string DEPLOY_MODE_PREFIX = "deploy_mode"; static const std::string DATA_PREFIX = "data"; static const std::string DPP_PREFIX = "dpp_download"; static const std::string SNAPSHOT_PREFIX = "snapshot"; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index b1617744eac6ba..418b7404a6d9c4 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -345,6 +345,7 @@ class ExecEnv { void _destroy(); Status _init_mem_env(); + Status _check_deploy_mode(); void _register_metrics(); void _deregister_metrics(); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index e43524b2d2a00b..437bd4e14c2f46 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -348,6 +348,8 @@ Status ExecEnv::_init(const std::vector& store_paths, options.store_paths = store_paths; options.broken_paths = broken_paths; options.backend_uid = doris::UniqueId::gen_uid(); + // Check if the startup mode has been modified + RETURN_IF_ERROR(_check_deploy_mode()); if (config::is_cloud_mode()) { std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id << ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl; @@ -628,6 +630,49 @@ void ExecEnv::init_mem_tracker() { MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); } +Status ExecEnv::_check_deploy_mode() { + for (auto _path : _store_paths) { + auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX); + std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local"; + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists)); + if (exists) { + // check if is ok + io::FileReaderSPtr reader; + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader)); + size_t fsize = reader->size(); + if (fsize > 0) { + std::string actual_mode; + actual_mode.resize(fsize, '\0'); + size_t bytes_read = 0; + RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read)); + DCHECK_EQ(fsize, bytes_read); + if (expected_mode != actual_mode) { + return Status::InternalError( + "You can't switch deploy mode from {} to {}, " + "maybe you need to check be.conf\n", + actual_mode.c_str(), expected_mode.c_str()); + } + LOG(INFO) << "The current deployment mode is " << expected_mode << "."; + } + } else { + io::FileWriterPtr file_writer; + RETURN_IF_ERROR( + io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer)); + RETURN_IF_ERROR(file_writer->append(expected_mode)); + RETURN_IF_ERROR(file_writer->close()); + LOG(INFO) << "The file deploy_mode doesn't exist, create it."; + auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX); + RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); + if (exists) { + LOG(WARNING) << "This may be an upgrade from old version," + << "or the deploy_mode file has been manually deleted"; + } + } + } + return Status::OK(); +} + void ExecEnv::_register_metrics() { REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() { return _send_batch_thread_pool->num_threads(); }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index d028f3aeae1437..d5b7cd7354eb1f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -231,7 +231,9 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star // Some exception may thrown before LOG is inited. // So need to print to stdout e.printStackTrace(); - LOG.warn("", e); + LOG.error("", e); + // to avoid nonDaemon Thread block main Thread, we need to force exit + System.exit(-1); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e5ef8e0dc721ab..e9b05a28c95a70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1077,6 +1077,7 @@ public void initialize(String[] args) throws Exception { // 2. get cluster id and role (Observer or Follower) if (!Config.enable_check_compatibility_mode) { + checkDeployMode(); getClusterIdAndRole(); } else { isElectable = true; @@ -1364,6 +1365,31 @@ protected void getClusterIdAndRole() throws IOException { clusterId, isElectable, role.name(), nodeName); } + /** + * write cloud/local to MODE_FILE. + */ + protected void checkDeployMode() throws IOException { + File modeFile = new File(this.imageDir, Storage.DEPLOY_MODE_FILE); + Storage storage = new Storage(this.imageDir); + String expectedMode = getDeployMode(); + if (modeFile.exists()) { + String actualMode = storage.getDeployMode(); + Preconditions.checkArgument(expectedMode.equals(actualMode), + "You can't switch deploy mode from %s to %s, maybe you need to check fe.conf", + actualMode, expectedMode); + LOG.info("The current deployment mode is " + expectedMode + "."); + } else { + storage.setDeployMode(expectedMode); + storage.writeClusterMode(); + LOG.info("The file DEPLOY_MODE doesn't exist, create it."); + File versionFile = new File(this.imageDir, Storage.VERSION_FILE); + if (versionFile.exists()) { + LOG.warn("This may be an upgrade from old version, " + + "or the DEPLOY_MODE file has been manually deleted"); + } + } + } + public static String genFeNodeName(String host, int port, boolean isOldStyle) { if (isOldStyle) { return host + "_" + port; @@ -4207,6 +4233,10 @@ public int getClusterId() { return this.clusterId; } + public String getDeployMode() { + return Config.isCloudMode() ? Storage.CLOUD_MODE : Storage.LOCAL_MODE; + } + public String getToken() { return token; } @@ -6710,3 +6740,4 @@ private void replayJournalsAndExit() { System.exit(0); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java index 910a42ab7610b8..9f8cd558a57cc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java @@ -55,11 +55,16 @@ public class Storage { public static final String IMAGE_NEW = "image.ckpt"; public static final String VERSION_FILE = "VERSION"; public static final String ROLE_FILE = "ROLE"; + public static final String DEPLOY_MODE_FILE = "DEPLOY_MODE"; + public static final String DEPLOY_MODE = "deploy_mode"; + public static final String CLOUD_MODE = "cloud"; + public static final String LOCAL_MODE = "local"; private int clusterID = 0; private String token; private FrontendNodeType role = FrontendNodeType.UNKNOWN; private String nodeName; + private String deployMode; private long editsSeq; private long latestImageSeq = 0; private long latestValidatedImageSeq = 0; @@ -116,6 +121,14 @@ public void reload() throws IOException { nodeName = prop.getProperty(NODE_NAME, null); } + File modeFile = getModeFile(); + if (modeFile.isFile()) { + try (FileInputStream in = new FileInputStream(modeFile)) { + prop.load(in); + } + deployMode = prop.getProperty(DEPLOY_MODE); + } + // Find the latest two images File dir = new File(metaDir); File[] children = dir.listFiles(); @@ -165,6 +178,14 @@ public FrontendNodeType getRole() { return role; } + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + public String getNodeName() { return nodeName; } @@ -224,6 +245,12 @@ public void writeFrontendRoleAndNodeName(FrontendNodeType role, String nameNode) writePropertiesToFile(properties, ROLE_FILE); } + public void writeClusterMode() throws IOException { + Properties properties = new Properties(); + properties.setProperty(DEPLOY_MODE, deployMode); + writePropertiesToFile(properties, DEPLOY_MODE_FILE); + } + private void writePropertiesToFile(Properties properties, String fileName) throws IOException { RandomAccessFile file = new RandomAccessFile(new File(metaDir, fileName), "rws"); FileOutputStream out = null; @@ -287,6 +314,10 @@ public final File getRoleFile() { return new File(metaDir, ROLE_FILE); } + public final File getModeFile() { + return new File(metaDir, DEPLOY_MODE_FILE); + } + public File getCurrentEditsFile() { return new File(metaDir, EDITS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ba91825711c099..6b66ed6b454e25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2263,6 +2263,19 @@ public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) th result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId()); } + if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { + // If the version of FE is too old, we need to ensure compatibility. + if (request.getDeployMode() == null) { + LOG.warn("Couldn't find deployMode in heartbeat info, " + + "maybe you need upgrade FE master."); + } else if (!request.getDeployMode().equals(Env.getCurrentEnv().getDeployMode())) { + result.setStatus(TFrontendPingFrontendStatusCode.FAILED); + result.setMsg("expected deployMode: " + + request.getDeployMode() + + ", but found deployMode: " + + Env.getCurrentEnv().getDeployMode()); + } + } if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { if (!request.getToken().equals(Env.getCurrentEnv().getToken())) { result.setStatus(TFrontendPingFrontendStatusCode.FAILED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index fb6853e83c34a5..9fcb4ca80e6714 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -375,6 +375,7 @@ private HeartbeatResponse getHeartbeatResponse() { try { client = ClientPool.frontendHeartbeatPool.borrowObject(addr); TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token); + request.setDeployMode(Env.getCurrentEnv().getDeployMode()); TFrontendPingFrontendResult result = client.ping(request); ok = true; if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b3c75be81823e3..122f55063b4d83 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -950,6 +950,7 @@ enum TFrontendPingFrontendStatusCode { struct TFrontendPingFrontendRequest { 1: required i32 clusterId 2: required string token + 3: optional string deployMode } struct TDiskInfo {