Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 77 additions & 64 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1492,58 +1492,73 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
return;
}

bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
for (int retry = 0; retry <= 1; retry++) {
bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}

int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}
int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}

// TODO(gavin): remove duplicated code with start_tablet_job()
// Begin to process finish tablet job
std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB recorded_job;
recorded_job.ParseFromString(job_val);
VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_job);
FinishTabletJobRequest_Action action = request->action();
// TODO(gavin): remove duplicated code with start_tablet_job()
// Begin to process finish tablet job
std::string job_key =
job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB recorded_job;
recorded_job.ParseFromString(job_val);
VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_job);
FinishTabletJobRequest_Action action = request->action();

std::string use_version =
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
<< " use_version=" << use_version;
if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit, use_version);
} else if (request->job().has_schema_change()) {
// Process schema change commit
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job,
instance_id, job_key, need_commit, use_version);
}

DORIS_CLOUD_DEFER {
if (!need_commit) return;
TxnErrorCode err = txn->commit();
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
if (action == FinishTabletJobRequest::COMMIT) {
Expand All @@ -1553,28 +1568,26 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
} else if (action == FinishTabletJobRequest::ABORT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_abort_counter << 1;
}

if (retry == 0 && !request->job().compaction().empty() &&
request->job().compaction(0).has_delete_bitmap_lock_initiator()) {
// Do a fast retry for mow when commit compaction job.
// The only fdb txn conflict here is that during the compaction job commit,
// a compaction lease RPC comes and finishes before the commit,
// so we retry to commit the compaction job again.
response->Clear();
code = MetaServiceCode::OK;
msg.clear();
continue;
}
}

code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit job kv, err=" << err;
msg = ss.str();
return;
}
};
std::string use_version =
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id << " use_version=" << use_version;
// Process compaction commit
if (!request->job().compaction().empty()) {
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit, use_version);
return;
}

// Process schema change commit
if (request->job().has_schema_change()) {
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit, use_version);
return;
break;
}
}

Expand Down