Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,6 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");

// Max aborted txn num for the same label name
CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");
} // namespace doris::cloud::config
56 changes: 32 additions & 24 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,46 +88,54 @@ std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
std::vector<NodeInfo> nodes;
std::string err = rc_mgr->get_node(cloud_unique_id, &nodes);
{ TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); }
std::string instance_id;
if (!err.empty()) {
// cache can't find cloud_unique_id, so degraded by parse cloud_unique_id
// cloud_unique_id encode: ${version}:${instance_id}:${unique_id}
// check it split by ':' c
auto vec = split(cloud_unique_id, ':');
std::stringstream ss;
for (int i = 0; i < vec.size(); ++i) {
ss << "idx " << i << "= [" << vec[i] << "] ";
}
LOG(INFO) << "degraded to get instance_id, cloud_unique_id: " << cloud_unique_id
<< "after split: " << ss.str();
if (vec.size() != 3) {
LOG(WARNING) << "cloud unique id is not degraded format, failed to check instance "
"info, cloud_unique_id="
<< cloud_unique_id << " , err=" << err;
auto [valid, id] = ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (!valid) {
LOG(WARNING) << "use degraded format cloud_unique_id, but cloud_unique_id not degrade "
"format, cloud_unique_id="
<< cloud_unique_id;
return "";
}
// version: vec[0], instance_id: vec[1], unique_id: vec[2]
switch (std::atoi(vec[0].c_str())) {
case 1:
// just return instance id;
return vec[1];
default:
LOG(WARNING) << "cloud unique id degraded state, but version not eq configure, "

// check instance_id valid by get fdb
if (config::enable_check_instance_id && !rc_mgr->is_instance_id_registered(id)) {
LOG(WARNING) << "use degraded format cloud_unique_id, but check instance failed, "
"cloud_unique_id="
<< cloud_unique_id << ", err=" << err;
<< cloud_unique_id;
return "";
}
return id;
}

std::string instance_id;
for (auto& i : nodes) {
if (!instance_id.empty() && instance_id != i.instance_id) {
for (auto& node : nodes) {
if (!instance_id.empty() && instance_id != node.instance_id) {
LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, "
<< " cloud_unique_id=" << cloud_unique_id
<< " current_instance_id=" << instance_id
<< " later_instance_id=" << i.instance_id;
<< " later_instance_id=" << node.instance_id;
}
instance_id = node.instance_id; // The last wins
// check cache unique_id
std::string cloud_unique_id_in_cache = node.node_info.cloud_unique_id();
auto [valid, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache);
if (!valid) {
continue;
}

if (id != node.instance_id || id != instance_id) {
LOG(WARNING) << "in cache, node=" << node.node_info.DebugString()
<< ", cloud_unique_id=" << cloud_unique_id
<< " current_instance_id=" << instance_id
<< ", later_instance_id=" << node.instance_id;
continue;
}
instance_id = i.instance_id; // The last wins
}

return instance_id;
}

Expand Down
18 changes: 14 additions & 4 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,16 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = request->has_instance_id() ? request->instance_id() : "";
if (!cloud_unique_id.empty() && instance_id.empty()) {
auto [is_degraded_format, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degraded_format &&
!resource_mgr_->is_instance_id_registered(id)) {
msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" +
cloud_unique_id;
LOG(WARNING) << msg;
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
Expand Down Expand Up @@ -1972,7 +1982,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::ADD_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
Expand All @@ -1996,7 +2006,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DROP_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
Expand All @@ -2019,7 +2029,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DECOMMISSION_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}

Expand Down Expand Up @@ -2081,7 +2091,7 @@ void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, false);
if (msg != "") {
LOG(INFO) << msg;
LOG(WARNING) << msg;
break;
}

Expand Down
34 changes: 33 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sstream>

#include "common/logging.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
Expand Down Expand Up @@ -159,6 +160,16 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
int master_num = 0;
int follower_num = 0;
for (auto& n : cluster.nodes()) {
// check here cloud_unique_id
std::string cloud_unique_id = n.cloud_unique_id();
auto [is_degrade_format, instance_id] = get_instance_id_by_cloud_unique_id(cloud_unique_id);
if (config::enable_check_instance_id && is_degrade_format &&
!is_instance_id_registered(instance_id)) {
ss << "node=" << n.DebugString()
<< " cloud_unique_id use degrade format, but check instance failed";
*err = ss.str();
return false;
}
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
Expand Down Expand Up @@ -199,6 +210,27 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
return no_err;
}

std::pair<bool, std::string> ResourceManager::get_instance_id_by_cloud_unique_id(
const std::string& cloud_unique_id) {
auto v = split(cloud_unique_id, ':');
if (v.size() != 3) return {false, ""};
// degraded format check it
int version = std::atoi(v[0].c_str());
if (version != 1) return {false, ""};
return {true, v[1]};
}

bool ResourceManager::is_instance_id_registered(const std::string& instance_id) {
// check kv
auto [c0, m0] = get_instance(nullptr, instance_id, nullptr);
{ TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); }
if (c0 != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to check instance instance_id=" << instance_id
<< ", code=" << format_as(c0) << ", info=" + m0;
}
return c0 == TxnErrorCode::TXN_OK;
}

std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::string& instance_id,
const ClusterInfo& cluster) {
std::string msg;
Expand Down Expand Up @@ -624,7 +656,7 @@ std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_p
return ec;
}

if (!inst_pb->ParseFromString(val)) {
if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
msg = "failed to parse InstanceInfoPB";
return ec;
Expand Down
19 changes: 19 additions & 0 deletions cloud/src/resource-manager/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,25 @@ class ResourceManager {
bool check_cluster_params_valid(const ClusterPB& cluster, std::string* err,
bool check_master_num);

/**
* Check cloud_unique_id is degraded format, and get instance_id from cloud_unique_id
* degraded format : "${version}:${instance_id}:${unique_id}"
* @param degraded cloud_unique_id
*
* @return a <is_degraded_format, instance_id> pair, if is_degraded_format == true , instance_id, if is_degraded_format == false, instance_id=""
*/
static std::pair<bool, std::string> get_instance_id_by_cloud_unique_id(
const std::string& cloud_unique_id);

/**
* check instance_id is a valid instance, check by get fdb kv
*
* @param instance_id
*
* @return true, instance_id in fdb kv
*/
bool is_instance_id_registered(const std::string& instance_id);

/**
* Refreshes the cache of given instance. This process removes the instance in cache
* and then replaces it with persisted instance state read from underlying KV storage.
Expand Down
1 change: 1 addition & 0 deletions cloud/test/fdb_injection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ int main(int argc, char** argv) {
cloud::config::txn_store_retry_base_intervals_ms = 1;
cloud::config::fdb_cluster_file_path = "fdb.cluster";
cloud::config::write_schema_kv = true;
cloud::config::enable_check_instance_id = false;

auto sp = SyncPoint::get_instance();
sp->enable_processing();
Expand Down
16 changes: 16 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ int main(int argc, char** argv) {
config::enable_txn_store_retry = true;
config::txn_store_retry_base_intervals_ms = 1;
config::txn_store_retry_times = 20;
config::enable_check_instance_id = false;

if (!doris::cloud::init_glog("meta_service_test")) {
std::cerr << "failed to init glog" << std::endl;
Expand Down Expand Up @@ -264,6 +265,21 @@ TEST(MetaServiceTest, GetInstanceIdTest) {
"12345678901:ALBJLH4Q:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "");

config::enable_check_instance_id = true;
auto ms = get_meta_service(false);
instance_id =
get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "");

sp->set_call_back("is_instance_id_registered", [&](auto&& args) {
TxnErrorCode* c0 = try_any_cast<TxnErrorCode*>(args[0]);
*c0 = TxnErrorCode::TXN_OK;
});
instance_id =
get_instance_id(ms->resource_mgr(), "1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "ALBJLH4Q-check-invalid");
config::enable_check_instance_id = false;

sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
Expand Down