Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();

{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -741,6 +744,10 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -841,6 +848,10 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label << ", is_2pc: " << is_2pc;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
}
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -860,6 +871,10 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
}
AbortTxnRequest req;
AbortTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -879,6 +894,10 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
}
PrecommitTxnRequest req;
PrecommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
std::stringstream ss;
ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label;
std::string op_info = ss.str();
LOG(INFO) << "rollback stream laod txn " << op_info;
LOG(INFO) << "rollback stream load txn " << op_info;
TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
: !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
: TxnOpParamType::ILLEGAL;
Expand Down
63 changes: 55 additions & 8 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ void register_suites() {
*arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) {
LOG(INFO) << "injection CloudMetaMgr::commit_txn";
auto* arg0 = try_any_cast_ret<Status>(args);
arg0->first = Status::InternalError<false>(
"test_file_segment_cache_corruption injection error");
arg0->second = true;
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand All @@ -139,16 +150,18 @@ void set_sleep(const std::string& point, HttpRequest* req) {
}
}
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [duration](auto&& args) {
sp->set_call_back(point, [point, duration](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
});
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void set_return(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return void";
auto pred = try_any_cast<bool*>(args.back());
*pred = true;
} catch (const std::bad_any_cast&) {
Expand All @@ -160,8 +173,9 @@ void set_return(const std::string& point, HttpRequest* req) {

void set_return_ok(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return ok";
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::OK();
pair->second = true;
Expand All @@ -188,8 +202,9 @@ void set_return_error(const std::string& point, HttpRequest* req) {
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [code](auto&& args) {
sp->set_call_back(point, [code, point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code;
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::Error<false>(code, "injected error");
pair->second = true;
Expand Down Expand Up @@ -243,7 +258,7 @@ void handle_clear(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void handle_suite(HttpRequest* req) {
void handle_apply_suite(HttpRequest* req) {
auto& suite = req->param("name");
if (suite.empty()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name");
Expand All @@ -253,10 +268,11 @@ void handle_suite(HttpRequest* req) {
std::call_once(register_suites_once, register_suites);
if (auto it = suite_map.find(suite); it != suite_map.end()) {
it->second(); // set injection callbacks
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n");
return;
}
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
"unknown suite: " + suite + "\n");
}

void handle_enable(HttpRequest* req) {
Expand All @@ -273,6 +289,37 @@ void handle_disable(HttpRequest* req) {

InjectionPointAction::InjectionPointAction() = default;

//
// enable/disable injection point
// ```
// curl "be_ip:http_port/api/injection_point/enable"
// curl "be_ip:http_port/api/injection_point/disable"
// ```
//
// clear all injection points
// ```
// curl "be_ip:http_port/api/injection_point/clear"
// ```
//
// apply/activate specific suite with registered action, see `register_suites()` for more details
// ```
// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}"
// ```
//
// set predifined action for specific injection point, supported actions are:
// * sleep: for injection point with callback, accepted param is `duration` in milliseconds
// * return: for injection point without return value (return void)
// * return_ok: for injection point with return value, always return Status::OK
// * return_error: for injection point with return value, accepted param is `code`,
// which is an int, valid values can be found in status.h, e.g. -235 or -230,
// if `code` is not present return Status::InternalError
// ```
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235
// ```
void InjectionPointAction::handle(HttpRequest* req) {
LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
auto& op = req->param("op");
Expand All @@ -283,7 +330,7 @@ void InjectionPointAction::handle(HttpRequest* req) {
handle_clear(req);
return;
} else if (op == "apply_suite") {
handle_suite(req);
handle_apply_suite(req);
return;
} else if (op == "enable") {
handle_enable(req);
Expand Down
5 changes: 3 additions & 2 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ CONF_Bool(enable_file_logger, "true");

// recycler config
CONF_mInt64(recycle_interval_seconds, "3600");
CONF_mInt64(retention_seconds, "259200"); // 72h
CONF_mInt64(retention_seconds, "259200"); // 72h, global retention time
CONF_Int32(recycle_concurrency, "16");
CONF_Int32(recycle_job_lease_expired_ms, "60000");
CONF_mInt64(compacted_rowset_retention_seconds, "10800"); // 3h
Expand All @@ -77,7 +77,8 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours

CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h

CONF_String(test_s3_ak, "");
CONF_String(test_s3_sk, "");
Expand Down
19 changes: 12 additions & 7 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle
return;
}

LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->put(running_key, running_val);
LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
Expand Down Expand Up @@ -569,8 +569,6 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms());
}

LOG(INFO) << "routine load new progress: " << new_progress_info.ShortDebugString();

if (!new_progress_info.SerializeToString(&new_progress_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize new progress val, txn_id=" << txn_id;
Expand All @@ -579,6 +577,8 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
}

txn->put(rl_progress_key, new_progress_val);
LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key)
<< " routine load new progress: " << new_progress_info.ShortDebugString();
}

void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
Expand Down Expand Up @@ -689,6 +689,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr

if (request->partition_to_offset().size() == 0) {
txn->remove(rl_progress_key);
LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key);
}

if (request->partition_to_offset().size() > 0) {
Expand Down Expand Up @@ -738,6 +739,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr
return;
}
txn->put(rl_progress_key, new_progress_val);
LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key);
}

err = txn->commit();
Expand Down Expand Up @@ -892,6 +894,7 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stat
stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs);
stats_pb.SerializeToString(&val);
txn->put(key, val);
LOG(INFO) << "put stats_tablet_key key=" << hex(key);
}
}

Expand Down Expand Up @@ -2370,6 +2373,7 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse*
stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs);
stats_pb.SerializeToString(&val);
txn->put(key, val);
LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
};
}
for (auto& [tablet_id, stats] : tablet_stats) {
Expand Down Expand Up @@ -2405,8 +2409,9 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse*
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;

LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes()
<< " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys()
<< " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;
Expand Down Expand Up @@ -2541,7 +2546,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re
//1. abort txn by txn id
//2. abort txn by label and db_id
if (txn_id > 0) {
VLOG_DEBUG << "abort_txn by txn_id";
VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id;
//abort txn by txn id
// Get db id with txn id

Expand Down Expand Up @@ -2610,7 +2615,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re
return;
}
} else {
VLOG_DEBUG << "abort_txn by db_id and txn label";
VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label;
//abort txn by label.
std::string label_key = txn_label_key({instance_id, db_id, label});
std::string label_val;
Expand Down Expand Up @@ -2726,7 +2731,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re
return;
}
txn->put(recycle_key, recycle_val);
LOG(INFO) << "xxx put recycle_key=" << hex(recycle_key)
LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key)
<< " txn_id=" << return_txn_info.txn_id();
}

Expand Down
23 changes: 21 additions & 2 deletions cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,27 @@ int HdfsAccessor::init() {
}

int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) {
LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); // Audit log
return 0;
auto uri = to_uri(path_prefix);
LOG(INFO) << "delete prefix, uri=" << uri;
std::unique_ptr<ListIterator> list_iter;
int ret = list_all(&list_iter);
if (ret != 0) {
LOG(WARNING) << "delete prefix, failed to list" << uri;
return ret;
}
size_t num_listed = 0, num_deleted = 0;
for (auto file = list_iter->next(); file; file = list_iter->next()) {
++num_listed;
if (file->path.find(path_prefix) != 0) continue;
if (int del_ret = delete_file(file->path); del_ret != 0) {
ret = del_ret;
break;
}
++num_deleted;
}
LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret
<< " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted;
return ret;
}

int HdfsAccessor::delete_directory_impl(const std::string& dir_path) {
Expand Down
16 changes: 12 additions & 4 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_
return {std::string_view(), range_move};
}
++num_recycled;
LOG_INFO("k is {}, is empty {}", k, k.empty());
LOG(INFO) << "recycle_tablets scan, key=" << (k.empty() ? "(empty)" : hex(k));
return {k, range_move};
});
} else {
Expand Down Expand Up @@ -1694,6 +1694,10 @@ int InstanceRecycler::recycle_rowsets() {
LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
return -1;
}

VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" << calc_expiration(rowset)
<< " RecycleRowsetPB=" << rowset.ShortDebugString();
int64_t current_time = ::time(nullptr);
if (current_time < calc_expiration(rowset)) { // not expired
return 0;
Expand Down Expand Up @@ -1745,8 +1749,8 @@ int InstanceRecycler::recycle_rowsets() {
<< rowset_meta->start_version() << '-' << rowset_meta->end_version()
<< "] txn_id=" << rowset_meta->txn_id()
<< " type=" << RecycleRowsetPB_Type_Name(rowset.type())
<< " rowset_meta_size=" << v.size() << " creation_time"
<< rowset_meta->creation_time();
<< " rowset_meta_size=" << v.size()
<< " creation_time=" << rowset_meta->creation_time();
if (rowset.type() == RecycleRowsetPB::PREPARE) {
// unable to calculate file path, can only be deleted by rowset id prefix
num_prepare += 1;
Expand Down Expand Up @@ -1910,6 +1914,10 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// duration or timeout always < `retention_time` in practice.
int64_t expiration =
rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time();
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" << expiration
<< " txn_expiration=" << rowset.txn_expiration()
<< " rowset_creation_time=" << rowset.creation_time();
if (current_time < expiration + config::retention_seconds) {
// not expired
return 0;
Expand Down Expand Up @@ -2150,7 +2158,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);

LOG_INFO("begin to recycle expire txn").tag("instance_id", instance_id_);
LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_);

int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
Expand Down