From 0d83189434c81f0104280235a60ba1d06c696771 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 7 Jul 2025 10:15:50 +0800 Subject: [PATCH] [Opt](cloud-mow) Do fast retry when commit compaction job for mow tablet (#52476) ### What problem does this PR solve? #### Before: ![image](https://github.com/user-attachments/assets/7494627e-f13b-4293-8eba-43e08dffd427) #### After ![image](https://github.com/user-attachments/assets/e03de5b7-8ddd-403c-bd1d-79d57fc22d6d) --- cloud/src/meta-service/meta_service_job.cpp | 141 +++++++++++--------- 1 file changed, 77 insertions(+), 64 deletions(-) diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 00d4dde51cec35..c6791aa4346406 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -1492,58 +1492,73 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr return; } - bool need_commit = false; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as(err); - msg = "failed to create txn"; - return; - } + for (int retry = 0; retry <= 1; retry++) { + bool need_commit = false; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = "failed to create txn"; + return; + } - int64_t tablet_id = request->job().idx().tablet_id(); - if (tablet_id <= 0) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "no valid tablet_id given"; - return; - } - auto& tablet_idx = const_cast(request->job().idx()); - if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() || - !tablet_idx.has_partition_id()) { - get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx); - if (code != MetaServiceCode::OK) return; - } - // Check if tablet has been dropped - if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(), - tablet_idx.partition_id())) { - code = MetaServiceCode::TABLET_NOT_FOUND; - msg = fmt::format("tablet {} has been dropped", tablet_id); - return; - } + int64_t tablet_id = request->job().idx().tablet_id(); + if (tablet_id <= 0) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "no valid tablet_id given"; + return; + } + auto& tablet_idx = const_cast(request->job().idx()); + if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() || + !tablet_idx.has_partition_id()) { + get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx); + if (code != MetaServiceCode::OK) return; + } + // Check if tablet has been dropped + if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(), + tablet_idx.partition_id())) { + code = MetaServiceCode::TABLET_NOT_FOUND; + msg = fmt::format("tablet {} has been dropped", tablet_id); + return; + } - // TODO(gavin): remove duplicated code with start_tablet_job() - // Begin to process finish tablet job - std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), - tablet_idx.partition_id(), tablet_id}); - std::string job_val; - err = txn->get(job_key, &job_val); - if (err != TxnErrorCode::TXN_OK) { - SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") - << " instance_id=" << instance_id << " tablet_id=" << tablet_id - << " job=" << proto_to_json(request->job()) << " err=" << err; - msg = ss.str(); - code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT - : cast_as(err); - return; - } - TabletJobInfoPB recorded_job; - recorded_job.ParseFromString(job_val); - VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id - << " job=" << proto_to_json(recorded_job); - FinishTabletJobRequest_Action action = request->action(); + // TODO(gavin): remove duplicated code with start_tablet_job() + // Begin to process finish tablet job + std::string job_key = + job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_id}); + std::string job_val; + err = txn->get(job_key, &job_val); + if (err != TxnErrorCode::TXN_OK) { + SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") + << " instance_id=" << instance_id << " tablet_id=" << tablet_id + << " job=" << proto_to_json(request->job()) << " err=" << err; + msg = ss.str(); + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT + : cast_as(err); + return; + } + TabletJobInfoPB recorded_job; + recorded_job.ParseFromString(job_val); + VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id + << " job=" << proto_to_json(recorded_job); + FinishTabletJobRequest_Action action = request->action(); + + std::string use_version = + delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); + LOG(INFO) << "finish_tablet_job instance_id=" << instance_id + << " use_version=" << use_version; + if (!request->job().compaction().empty()) { + // Process compaction commit + process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id, + job_key, need_commit, use_version); + } else if (request->job().has_schema_change()) { + // Process schema change commit + process_schema_change_job(code, msg, ss, txn, request, response, recorded_job, + instance_id, job_key, need_commit, use_version); + } - DORIS_CLOUD_DEFER { if (!need_commit) return; - TxnErrorCode err = txn->commit(); + err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { if (err == TxnErrorCode::TXN_CONFLICT) { if (action == FinishTabletJobRequest::COMMIT) { @@ -1553,28 +1568,26 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr } else if (action == FinishTabletJobRequest::ABORT) { g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_abort_counter << 1; } + + if (retry == 0 && !request->job().compaction().empty() && + request->job().compaction(0).has_delete_bitmap_lock_initiator()) { + // Do a fast retry for mow when commit compaction job. + // The only fdb txn conflict here is that during the compaction job commit, + // a compaction lease RPC comes and finishes before the commit, + // so we retry to commit the compaction job again. + response->Clear(); + code = MetaServiceCode::OK; + msg.clear(); + continue; + } } + code = cast_as(err); ss << "failed to commit job kv, err=" << err; msg = ss.str(); return; } - }; - std::string use_version = - delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id); - LOG(INFO) << "finish_tablet_job instance_id=" << instance_id << " use_version=" << use_version; - // Process compaction commit - if (!request->job().compaction().empty()) { - process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id, - job_key, need_commit, use_version); - return; - } - - // Process schema change commit - if (request->job().has_schema_change()) { - process_schema_change_job(code, msg, ss, txn, request, response, recorded_job, instance_id, - job_key, need_commit, use_version); - return; + break; } }