diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index bc6c3a0f098e7e..8138ea52421102 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -715,7 +715,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); - + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st); + } CreateRowsetRequest req; CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); @@ -741,6 +744,10 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st); + } CreateRowsetRequest req; CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); @@ -841,6 +848,10 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label << ", is_2pc: " << is_2pc; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st); + } CommitTxnRequest req; CommitTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); @@ -860,6 +871,10 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st); + } AbortTxnRequest req; AbortTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); @@ -879,6 +894,10 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st); + } PrecommitTxnRequest req; PrecommitTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index 1b8167c96ebd48..1352b4aac81a5f 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -129,7 +129,7 @@ void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { std::stringstream ss; ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label; std::string op_info = ss.str(); - LOG(INFO) << "rollback stream laod txn " << op_info; + LOG(INFO) << "rollback stream load txn " << op_info; TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL : TxnOpParamType::ILLEGAL; diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index be90ee23afddae..1880f14a3b77d1 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -124,6 +124,17 @@ void register_suites() { *arg0 = Status::Corruption("test_file_segment_cache_corruption injection error"); }); }); + // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' + suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) { + LOG(INFO) << "injection CloudMetaMgr::commit_txn"; + auto* arg0 = try_any_cast_ret(args); + arg0->first = Status::InternalError( + "test_file_segment_cache_corruption injection error"); + arg0->second = true; + }); + }); } void set_sleep(const std::string& point, HttpRequest* req) { @@ -139,7 +150,8 @@ void set_sleep(const std::string& point, HttpRequest* req) { } } auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [duration](auto&& args) { + sp->set_call_back(point, [point, duration](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration; std::this_thread::sleep_for(std::chrono::milliseconds(duration)); }); HttpChannel::send_reply(req, HttpStatus::OK, "OK"); @@ -147,8 +159,9 @@ void set_sleep(const std::string& point, HttpRequest* req) { void set_return(const std::string& point, HttpRequest* req) { auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [](auto&& args) { + sp->set_call_back(point, [point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return void"; auto pred = try_any_cast(args.back()); *pred = true; } catch (const std::bad_any_cast&) { @@ -160,8 +173,9 @@ void set_return(const std::string& point, HttpRequest* req) { void set_return_ok(const std::string& point, HttpRequest* req) { auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [](auto&& args) { + sp->set_call_back(point, [point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return ok"; auto* pair = try_any_cast_ret(args); pair->first = Status::OK(); pair->second = true; @@ -188,8 +202,9 @@ void set_return_error(const std::string& point, HttpRequest* req) { } auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [code](auto&& args) { + sp->set_call_back(point, [code, point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code; auto* pair = try_any_cast_ret(args); pair->first = Status::Error(code, "injected error"); pair->second = true; @@ -243,7 +258,7 @@ void handle_clear(HttpRequest* req) { HttpChannel::send_reply(req, HttpStatus::OK, "OK"); } -void handle_suite(HttpRequest* req) { +void handle_apply_suite(HttpRequest* req) { auto& suite = req->param("name"); if (suite.empty()) { HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name"); @@ -253,10 +268,11 @@ void handle_suite(HttpRequest* req) { std::call_once(register_suites_once, register_suites); if (auto it = suite_map.find(suite); it != suite_map.end()) { it->second(); // set injection callbacks - HttpChannel::send_reply(req, HttpStatus::OK, "OK"); + HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n"); return; } - HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "unknown suite: " + suite + "\n"); } void handle_enable(HttpRequest* req) { @@ -273,6 +289,37 @@ void handle_disable(HttpRequest* req) { InjectionPointAction::InjectionPointAction() = default; +// +// enable/disable injection point +// ``` +// curl "be_ip:http_port/api/injection_point/enable" +// curl "be_ip:http_port/api/injection_point/disable" +// ``` +// +// clear all injection points +// ``` +// curl "be_ip:http_port/api/injection_point/clear" +// ``` +// +// apply/activate specific suite with registered action, see `register_suites()` for more details +// ``` +// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}" +// ``` +// +// set predifined action for specific injection point, supported actions are: +// * sleep: for injection point with callback, accepted param is `duration` in milliseconds +// * return: for injection point without return value (return void) +// * return_ok: for injection point with return value, always return Status::OK +// * return_error: for injection point with return value, accepted param is `code`, +// which is an int, valid values can be found in status.h, e.g. -235 or -230, +// if `code` is not present return Status::InternalError +// ``` +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235 +// ``` void InjectionPointAction::handle(HttpRequest* req) { LOG(INFO) << "handle InjectionPointAction " << req->debug_string(); auto& op = req->param("op"); @@ -283,7 +330,7 @@ void InjectionPointAction::handle(HttpRequest* req) { handle_clear(req); return; } else if (op == "apply_suite") { - handle_suite(req); + handle_apply_suite(req); return; } else if (op == "enable") { handle_enable(req); diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index d401caa4ad975f..e31a60a0d69f80 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -55,7 +55,7 @@ CONF_Bool(enable_file_logger, "true"); // recycler config CONF_mInt64(recycle_interval_seconds, "3600"); -CONF_mInt64(retention_seconds, "259200"); // 72h +CONF_mInt64(retention_seconds, "259200"); // 72h, global retention time CONF_Int32(recycle_concurrency, "16"); CONF_Int32(recycle_job_lease_expired_ms, "60000"); CONF_mInt64(compacted_rowset_retention_seconds, "10800"); // 3h @@ -77,7 +77,8 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min -CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h +// log a warning if a recycle task takes longer than this duration +CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h CONF_String(test_s3_ak, ""); CONF_String(test_s3_sk, ""); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 3cd1bd798bbc84..5f2638c8d1fa28 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -479,8 +479,8 @@ void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle return; } - LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; txn->put(running_key, running_val); + LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { @@ -569,8 +569,6 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg, new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms()); } - LOG(INFO) << "routine load new progress: " << new_progress_info.ShortDebugString(); - if (!new_progress_info.SerializeToString(&new_progress_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; ss << "failed to serialize new progress val, txn_id=" << txn_id; @@ -579,6 +577,8 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg, } txn->put(rl_progress_key, new_progress_val); + LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key) + << " routine load new progress: " << new_progress_info.ShortDebugString(); } void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller, @@ -689,6 +689,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr if (request->partition_to_offset().size() == 0) { txn->remove(rl_progress_key); + LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key); } if (request->partition_to_offset().size() > 0) { @@ -738,6 +739,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr return; } txn->put(rl_progress_key, new_progress_val); + LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key); } err = txn->commit(); @@ -892,6 +894,7 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stat stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); stats_pb.SerializeToString(&val); txn->put(key, val); + LOG(INFO) << "put stats_tablet_key key=" << hex(key); } } @@ -2370,6 +2373,7 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); stats_pb.SerializeToString(&val); txn->put(key, val); + LOG(INFO) << "put stats_tablet_key, key=" << hex(key); }; } for (auto& [tablet_id, stats] : tablet_stats) { @@ -2405,8 +2409,9 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* return; } txn->put(recycle_key, recycle_val); + LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key) + << " txn_id=" << txn_id; - LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys() << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; @@ -2541,7 +2546,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re //1. abort txn by txn id //2. abort txn by label and db_id if (txn_id > 0) { - VLOG_DEBUG << "abort_txn by txn_id"; + VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id; //abort txn by txn id // Get db id with txn id @@ -2610,7 +2615,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re return; } } else { - VLOG_DEBUG << "abort_txn by db_id and txn label"; + VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label; //abort txn by label. std::string label_key = txn_label_key({instance_id, db_id, label}); std::string label_val; @@ -2726,7 +2731,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re return; } txn->put(recycle_key, recycle_val); - LOG(INFO) << "xxx put recycle_key=" << hex(recycle_key) + LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key) << " txn_id=" << return_txn_info.txn_id(); } diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index e5038735f5735f..97a4670d2bfc6b 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -354,8 +354,27 @@ int HdfsAccessor::init() { } int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) { - LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); // Audit log - return 0; + auto uri = to_uri(path_prefix); + LOG(INFO) << "delete prefix, uri=" << uri; + std::unique_ptr list_iter; + int ret = list_all(&list_iter); + if (ret != 0) { + LOG(WARNING) << "delete prefix, failed to list" << uri; + return ret; + } + size_t num_listed = 0, num_deleted = 0; + for (auto file = list_iter->next(); file; file = list_iter->next()) { + ++num_listed; + if (file->path.find(path_prefix) != 0) continue; + if (int del_ret = delete_file(file->path); del_ret != 0) { + ret = del_ret; + break; + } + ++num_deleted; + } + LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret + << " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted; + return ret; } int HdfsAccessor::delete_directory_impl(const std::string& dir_path) { diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 919b50358c77f5..9db16a18c13d4c 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1219,7 +1219,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ return {std::string_view(), range_move}; } ++num_recycled; - LOG_INFO("k is {}, is empty {}", k, k.empty()); + LOG(INFO) << "recycle_tablets scan, key=" << (k.empty() ? "(empty)" : hex(k)); return {k, range_move}; }); } else { @@ -1694,6 +1694,10 @@ int InstanceRecycler::recycle_rowsets() { LOG_WARNING("malformed recycle rowset").tag("key", hex(k)); return -1; } + + VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned + << " num_expired=" << num_expired << " expiration=" << calc_expiration(rowset) + << " RecycleRowsetPB=" << rowset.ShortDebugString(); int64_t current_time = ::time(nullptr); if (current_time < calc_expiration(rowset)) { // not expired return 0; @@ -1745,8 +1749,8 @@ int InstanceRecycler::recycle_rowsets() { << rowset_meta->start_version() << '-' << rowset_meta->end_version() << "] txn_id=" << rowset_meta->txn_id() << " type=" << RecycleRowsetPB_Type_Name(rowset.type()) - << " rowset_meta_size=" << v.size() << " creation_time" - << rowset_meta->creation_time(); + << " rowset_meta_size=" << v.size() + << " creation_time=" << rowset_meta->creation_time(); if (rowset.type() == RecycleRowsetPB::PREPARE) { // unable to calculate file path, can only be deleted by rowset id prefix num_prepare += 1; @@ -1910,6 +1914,10 @@ int InstanceRecycler::recycle_tmp_rowsets() { // duration or timeout always < `retention_time` in practice. int64_t expiration = rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time(); + VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned + << " num_expired=" << num_expired << " expiration=" << expiration + << " txn_expiration=" << rowset.txn_expiration() + << " rowset_creation_time=" << rowset.creation_time(); if (current_time < expiration + config::retention_seconds) { // not expired return 0; @@ -2150,7 +2158,7 @@ int InstanceRecycler::recycle_expired_txn_label() { recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key); recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key); - LOG_INFO("begin to recycle expire txn").tag("instance_id", instance_id_); + LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_); int64_t start_time = duration_cast(steady_clock::now().time_since_epoch()).count(); register_recycle_task(task_name, start_time);