Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum OLAPDataVersion {
// Different types of folder names under storage_root_path
static const std::string MINI_PREFIX = "mini_download";
static const std::string CLUSTER_ID_PREFIX = "cluster_id";
static const std::string DEPLOY_MODE_PREFIX = "deploy_mode";
static const std::string DATA_PREFIX = "data";
static const std::string DPP_PREFIX = "dpp_download";
static const std::string SNAPSHOT_PREFIX = "snapshot";
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ class ExecEnv {
void _destroy();

Status _init_mem_env();
Status _check_deploy_mode();

void _register_metrics();
void _deregister_metrics();
Expand Down
45 changes: 45 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
options.store_paths = store_paths;
options.broken_paths = broken_paths;
options.backend_uid = doris::UniqueId::gen_uid();
// Check if the startup mode has been modified
RETURN_IF_ERROR(_check_deploy_mode());
if (config::is_cloud_mode()) {
std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id
<< ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl;
Expand Down Expand Up @@ -628,6 +630,49 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe");
}

Status ExecEnv::_check_deploy_mode() {
for (auto _path : _store_paths) {
auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX);
std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local";
bool exists = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists));
if (exists) {
// check if is ok
io::FileReaderSPtr reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader));
size_t fsize = reader->size();
if (fsize > 0) {
std::string actual_mode;
actual_mode.resize(fsize, '\0');
size_t bytes_read = 0;
RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read));
DCHECK_EQ(fsize, bytes_read);
if (expected_mode != actual_mode) {
return Status::InternalError(
"You can't switch deploy mode from {} to {}, "
"maybe you need to check be.conf\n",
actual_mode.c_str(), expected_mode.c_str());
}
LOG(INFO) << "The current deployment mode is " << expected_mode << ".";
}
} else {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(
io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer));
RETURN_IF_ERROR(file_writer->append(expected_mode));
RETURN_IF_ERROR(file_writer->close());
LOG(INFO) << "The file deploy_mode doesn't exist, create it.";
auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX);
RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
if (exists) {
LOG(WARNING) << "This may be an upgrade from old version,"
<< "or the deploy_mode file has been manually deleted";
}
}
}
return Status::OK();
}

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return _send_batch_thread_pool->num_threads(); });
Expand Down
4 changes: 3 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
// Some exception may thrown before LOG is inited.
// So need to print to stdout
e.printStackTrace();
LOG.warn("", e);
LOG.error("", e);
// to avoid nonDaemon Thread block main Thread, we need to force exit
System.exit(-1);
}
}

Expand Down
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,7 @@ public void initialize(String[] args) throws Exception {

// 2. get cluster id and role (Observer or Follower)
if (!Config.enable_check_compatibility_mode) {
checkDeployMode();
getClusterIdAndRole();
} else {
isElectable = true;
Expand Down Expand Up @@ -1364,6 +1365,31 @@ protected void getClusterIdAndRole() throws IOException {
clusterId, isElectable, role.name(), nodeName);
}

/**
* write cloud/local to MODE_FILE.
*/
protected void checkDeployMode() throws IOException {
File modeFile = new File(this.imageDir, Storage.DEPLOY_MODE_FILE);
Storage storage = new Storage(this.imageDir);
String expectedMode = getDeployMode();
if (modeFile.exists()) {
String actualMode = storage.getDeployMode();
Preconditions.checkArgument(expectedMode.equals(actualMode),
"You can't switch deploy mode from %s to %s, maybe you need to check fe.conf",
actualMode, expectedMode);
LOG.info("The current deployment mode is " + expectedMode + ".");
} else {
storage.setDeployMode(expectedMode);
storage.writeClusterMode();
LOG.info("The file DEPLOY_MODE doesn't exist, create it.");
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
if (versionFile.exists()) {
LOG.warn("This may be an upgrade from old version, "
+ "or the DEPLOY_MODE file has been manually deleted");
}
}
}

public static String genFeNodeName(String host, int port, boolean isOldStyle) {
if (isOldStyle) {
return host + "_" + port;
Expand Down Expand Up @@ -4207,6 +4233,10 @@ public int getClusterId() {
return this.clusterId;
}

public String getDeployMode() {
return Config.isCloudMode() ? Storage.CLOUD_MODE : Storage.LOCAL_MODE;
}

public String getToken() {
return token;
}
Expand Down Expand Up @@ -6710,3 +6740,4 @@ private void replayJournalsAndExit() {
System.exit(0);
}
}

31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ public class Storage {
public static final String IMAGE_NEW = "image.ckpt";
public static final String VERSION_FILE = "VERSION";
public static final String ROLE_FILE = "ROLE";
public static final String DEPLOY_MODE_FILE = "DEPLOY_MODE";
public static final String DEPLOY_MODE = "deploy_mode";
public static final String CLOUD_MODE = "cloud";
public static final String LOCAL_MODE = "local";

private int clusterID = 0;
private String token;
private FrontendNodeType role = FrontendNodeType.UNKNOWN;
private String nodeName;
private String deployMode;
private long editsSeq;
private long latestImageSeq = 0;
private long latestValidatedImageSeq = 0;
Expand Down Expand Up @@ -116,6 +121,14 @@ public void reload() throws IOException {
nodeName = prop.getProperty(NODE_NAME, null);
}

File modeFile = getModeFile();
if (modeFile.isFile()) {
try (FileInputStream in = new FileInputStream(modeFile)) {
prop.load(in);
}
deployMode = prop.getProperty(DEPLOY_MODE);
}

// Find the latest two images
File dir = new File(metaDir);
File[] children = dir.listFiles();
Expand Down Expand Up @@ -165,6 +178,14 @@ public FrontendNodeType getRole() {
return role;
}

public String getDeployMode() {
return deployMode;
}

public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}

public String getNodeName() {
return nodeName;
}
Expand Down Expand Up @@ -224,6 +245,12 @@ public void writeFrontendRoleAndNodeName(FrontendNodeType role, String nameNode)
writePropertiesToFile(properties, ROLE_FILE);
}

public void writeClusterMode() throws IOException {
Properties properties = new Properties();
properties.setProperty(DEPLOY_MODE, deployMode);
writePropertiesToFile(properties, DEPLOY_MODE_FILE);
}

private void writePropertiesToFile(Properties properties, String fileName) throws IOException {
RandomAccessFile file = new RandomAccessFile(new File(metaDir, fileName), "rws");
FileOutputStream out = null;
Expand Down Expand Up @@ -287,6 +314,10 @@ public final File getRoleFile() {
return new File(metaDir, ROLE_FILE);
}

public final File getModeFile() {
return new File(metaDir, DEPLOY_MODE_FILE);
}

public File getCurrentEditsFile() {
return new File(metaDir, EDITS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,19 @@ public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) th
result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId());
}

if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
// If the version of FE is too old, we need to ensure compatibility.
if (request.getDeployMode() == null) {
LOG.warn("Couldn't find deployMode in heartbeat info, "
+ "maybe you need upgrade FE master.");
} else if (!request.getDeployMode().equals(Env.getCurrentEnv().getDeployMode())) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("expected deployMode: "
+ request.getDeployMode()
+ ", but found deployMode: "
+ Env.getCurrentEnv().getDeployMode());
}
}
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
if (!request.getToken().equals(Env.getCurrentEnv().getToken())) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ private HeartbeatResponse getHeartbeatResponse() {
try {
client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token);
request.setDeployMode(Env.getCurrentEnv().getDeployMode());
TFrontendPingFrontendResult result = client.ping(request);
ok = true;
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ enum TFrontendPingFrontendStatusCode {
struct TFrontendPingFrontendRequest {
1: required i32 clusterId
2: required string token
3: optional string deployMode
}

struct TDiskInfo {
Expand Down