From 1380fd5f29b17078b4188491c3ec0f062a5e09e3 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Wed, 10 Jul 2024 11:02:14 +0800 Subject: [PATCH 01/10] (cloud-merge) Support to abort txn when coordinate be restart and do schema change --- cloud/src/common/bvars.cpp | 1 + cloud/src/common/bvars.h | 1 + cloud/src/meta-service/meta_service.h | 5 + cloud/src/meta-service/meta_service_txn.cpp | 270 +++++++++++++----- .../org/apache/doris/alter/RollupJobV2.java | 21 +- .../apache/doris/alter/SchemaChangeJobV2.java | 21 +- .../CloudGlobalTransactionMgr.java | 36 +++ .../transaction/DatabaseTransactionMgr.java | 23 ++ .../transaction/GlobalTransactionMgr.java | 50 ++++ .../GlobalTransactionMgrIface.java | 3 + gensrc/proto/cloud.proto | 13 + 11 files changed, 366 insertions(+), 78 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index dc401398f68c35..81d46b5f2b12ed 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -30,6 +30,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_m BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict"); +BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_by_coordinate_be("ms", "abort_txn_by_coordinate_be"); BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label"); BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version"); BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version("ms", "batch_get_version"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index f2957e35940334..5949bf9d8b3cdb 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -126,6 +126,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict; +extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_by_coordinate_be; extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index e2360e9e6ba2f7..f8e3f2fc239b9d 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -89,6 +89,11 @@ class MetaServiceImpl : public cloud::MetaService { CheckTxnConflictResponse* response, ::google::protobuf::Closure* done) override; + void abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, + const AbortTxnByCoordinateBeRequest* request, + AbortTxnBycoordinateBeResponse* response, + ::google::protobuf::Closure* done) override; + void clean_txn_label(::google::protobuf::RpcController* controller, const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response, ::google::protobuf::Closure* done) override; diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 0a3439e94f77db..20385b9de1b2cf 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. +#include + #include +#include #include "common/logging.h" #include "cpp/sync_point.h" #include "meta-service/doris_txn.h" +#include "meta-service/keys.h" #include "meta-service/meta_service.h" #include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_tablet_stats.h" @@ -1814,47 +1818,16 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, tmp_rowsets_meta); } -void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, - const AbortTxnRequest* request, AbortTxnResponse* response, - ::google::protobuf::Closure* done) { - RPC_PREPROCESS(abort_txn); - // Get txn id - int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; - std::string label = request->has_label() ? request->label() : ""; - int64_t db_id = request->has_db_id() ? request->db_id() : -1; - if (txn_id < 0 && (label.empty() || db_id < 0)) { - code = MetaServiceCode::INVALID_ARGUMENT; - ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << txn_id - << " label=" << label; - msg = ss.str(); - return; - } - - std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; - instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); - if (instance_id.empty()) { - code = MetaServiceCode::INVALID_ARGUMENT; - ss << "cannot find instance_id with cloud_unique_id=" - << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label - << " txn_id=" << txn_id; - msg = ss.str(); - return; - } - - RPC_RATE_LIMIT(abort_txn); - std::unique_ptr txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as(err); - ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" << label - << " err=" << err; - msg = ss.str(); - return; - } +static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* request, + Transaction* txn, TxnInfoPB& return_txn_info, std::stringstream& ss, + MetaServiceCode& code, std::string& msg) { + int64_t txn_id = request->txn_id(); + std::string label = request->label(); + int64_t db_id = request->db_id(); std::string info_key; // Will be used when saving updated txn std::string info_val; // Will be reused when saving updated txn - TxnInfoPB txn_info; + TxnErrorCode err; //TODO: split with two function. //there two ways to abort txn: //1. abort txn by txn id @@ -1867,7 +1840,7 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, std::string index_key; std::string index_val; //not provide db_id, we need read from disk. - if (!request->has_db_id()) { + if (db_id == 0) { index_key = txn_index_key({instance_id, txn_id}); err = txn->get(index_key, &index_val); if (err != TxnErrorCode::TXN_OK) { @@ -1893,8 +1866,6 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, DCHECK(index_pb.has_tablet_index() == true); DCHECK(index_pb.tablet_index().has_db_id() == true); db_id = index_pb.tablet_index().db_id(); - } else { - db_id = request->db_id(); } // Get txn info with db_id and txn_id @@ -1908,23 +1879,23 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, return; } - if (!txn_info.ParseFromString(info_val)) { + if (!return_txn_info.ParseFromString(info_val)) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << txn_id; msg = ss.str(); return; } - DCHECK(txn_info.txn_id() == txn_id); + DCHECK(return_txn_info.txn_id() == txn_id); //check state is valid. - if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { + if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { code = MetaServiceCode::TXN_ALREADY_ABORTED; ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; msg = ss.str(); return; } - if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { + if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { code = MetaServiceCode::TXN_ALREADY_VISIBLE; ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id; msg = ss.str(); @@ -1987,10 +1958,11 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, if ((cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) || (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED)) { prepare_txn_id = cur_txn_id; - txn_info = std::move(cur_txn_info); + return_txn_info = std::move(cur_txn_info); info_key = std::move(cur_info_key); - DCHECK_EQ(prepare_txn_id, txn_info.txn_id()) - << "prepare_txn_id=" << prepare_txn_id << " txn_id=" << txn_info.txn_id(); + DCHECK_EQ(prepare_txn_id, return_txn_info.txn_id()) + << "prepare_txn_id=" << prepare_txn_id + << " txn_id=" << return_txn_info.txn_id(); break; } } @@ -2008,45 +1980,88 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, uint64_t finish_time = duration_cast(now_time.time_since_epoch()).count(); // Update txn_info - txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED); - txn_info.set_finish_time(finish_time); - request->has_reason() ? txn_info.set_reason(request->reason()) - : txn_info.set_reason("User Abort"); + return_txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED); + return_txn_info.set_finish_time(finish_time); + request->has_reason() ? return_txn_info.set_reason(request->reason()) + : return_txn_info.set_reason("User Abort"); if (request->has_commit_attachment()) { - txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); + return_txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment()); } info_val.clear(); - if (!txn_info.SerializeToString(&info_val)) { + if (!return_txn_info.SerializeToString(&info_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; - ss << "failed to serialize txn_info when saving, txn_id=" << txn_info.txn_id(); + ss << "failed to serialize txn_info when saving, txn_id=" << return_txn_info.txn_id(); msg = ss.str(); return; } - LOG(INFO) << "check watermark conflict, txn_info=" << txn_info.ShortDebugString(); + LOG(INFO) << "check watermark conflict, txn_info=" << return_txn_info.ShortDebugString(); txn->put(info_key, info_val); - LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_info.txn_id(); + LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << return_txn_info.txn_id(); - std::string running_key = txn_running_key({instance_id, db_id, txn_info.txn_id()}); + std::string running_key = txn_running_key({instance_id, db_id, return_txn_info.txn_id()}); txn->remove(running_key); - LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_info.txn_id(); + LOG(INFO) << "xxx remove running_key=" << hex(running_key) + << " txn_id=" << return_txn_info.txn_id(); - std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_info.txn_id()}); + std::string recycle_key = recycle_txn_key({instance_id, db_id, return_txn_info.txn_id()}); std::string recycle_val; RecycleTxnPB recycle_pb; recycle_pb.set_creation_time(finish_time); - recycle_pb.set_label(txn_info.label()); + recycle_pb.set_label(return_txn_info.label()); if (!recycle_pb.SerializeToString(&recycle_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; - ss << "failed to serialize recycle_pb, txn_id=" << txn_info.txn_id(); + ss << "failed to serialize recycle_pb, txn_id=" << return_txn_info.txn_id(); msg = ss.str(); return; } txn->put(recycle_key, recycle_val); - LOG(INFO) << "xxx put recycle_key=" << hex(recycle_key) << " txn_id=" << txn_info.txn_id(); + LOG(INFO) << "xxx put recycle_key=" << hex(recycle_key) + << " txn_id=" << return_txn_info.txn_id(); +} +void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, + const AbortTxnRequest* request, AbortTxnResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(abort_txn); + // Get txn id + int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1; + std::string label = request->has_label() ? request->label() : ""; + int64_t db_id = request->has_db_id() ? request->db_id() : -1; + if (txn_id < 0 && (label.empty() || db_id < 0)) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << txn_id + << " label=" << label; + msg = ss.str(); + return; + } + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label + << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + + RPC_RATE_LIMIT(abort_txn); + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" << label + << " err=" << err; + msg = ss.str(); + return; + } + TxnInfoPB txn_info; + + _abort_txn(instance_id, request, txn.get(), txn_info, ss, code, msg); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -2553,6 +2568,99 @@ void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle response->mutable_txn_info()->CopyFrom(txn_info); } +void MetaServiceImpl::abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, + const AbortTxnByCoordinateBeRequest* request, + AbortTxnBycoordinateBeResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(abort_txn_by_coordinate_be); + if (!request->has_id() || !request->has_ip() || request->has_start_time()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "invalid coordinate id, end coordinate ip or coordinate start time."; + return; + } + // TODO: For auth + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); + msg = ss.str(); + return; + } + RPC_RATE_LIMIT(abort_txn_by_coordinate_be); + std::string begin_info_key = txn_info_key({instance_id, 0, 0}); + std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX}); + LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); + + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + msg = "failed to create txn"; + code = cast_as(err); + return; + } + std::unique_ptr it; + int64_t abort_txn_cnt = 0; + int64_t total_iteration_cnt = 0; + bool need_commit = false; + do { + err = txn->get(begin_info_key, end_info_key, &it, true); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to get txn info. err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + + while (it->has_next()) { + total_iteration_cnt++; + auto [k, v] = it->next(); + LOG(INFO) << "check txn info txn_info_key=" << hex(k); + TxnInfoPB info_pb; + if (!info_pb.ParseFromArray(v.data(), v.size())) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "malformed txn running info"; + msg = ss.str(); + ss << " key=" << hex(k); + LOG(WARNING) << ss.str(); + return; + } + const auto& coordinate = info_pb.coordinator(); + if (coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() && + coordinate.ip() == request->ip() && + coordinate.start_time() < request->start_time()) { + need_commit = true; + TxnInfoPB return_txn_info; + AbortTxnRequest request; + request.set_db_id(info_pb.db_id()); + request.set_txn_id(info_pb.txn_id()); + request.set_label(info_pb.label()); + request.set_reason("Abort because coordinate be restart/stop"); + _abort_txn(instance_id, &request, txn.get(), return_txn_info, ss, code, msg); + } + if (!it->has_next()) { + begin_info_key = k; + } + } + begin_info_key.push_back('\x00'); // Update to next smallest key for iteration + } while (it->more()); + LOG(INFO) << "abort txn count: " << abort_txn_cnt + << " total iteration count: " << total_iteration_cnt; + if (need_commit) { + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to abort txn kv, cooridnate_id=" << request->id() + << " coordinate_ip=" << request->ip() + << "coordinate_start_time=" << request->start_time() << " err=" << err; + msg = ss.str(); + return; + } + } +} + void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller, const CheckTxnConflictRequest* request, CheckTxnConflictResponse* response, @@ -2595,6 +2703,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont std::unique_ptr it; int64_t skip_timeout_txn_cnt = 0; int total_iteration_cnt = 0; + bool finished = true; do { err = txn->get(begin_running_key, end_running_key, &it, true); if (err != TxnErrorCode::TXN_OK) { @@ -2640,10 +2749,34 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont running_table_ids.end(), result.begin()); result.resize(iter - result.begin()); if (result.size() > 0) { - response->set_finished(false); - LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt - << " total iteration count: " << total_iteration_cnt; - return; + finished = false; + std::vector, int, int>> out; + decode_key(&k, &out); + DCHECK(out.size() == 3) << out.size(); + const std::string& decode_instance_id = std::get<1>(std::get<0>(out[0])); + int64_t db_id = std::get<0>(std::get<0>(out[1])); + int64_t txn_id = std::get<0>(std::get<0>(out[1])); + std::string conflict_txn_info_key = + txn_info_key({decode_instance_id, db_id, txn_id}); + std::string conflict_txn_info_val; + err = txn->get(conflict_txn_info_key, &conflict_txn_info_val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND + ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as(err); + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + TxnInfoPB& conflict_txn_info = *response->add_conflict_txns(); + if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } } } @@ -2654,8 +2787,9 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont begin_running_key.push_back('\x00'); // Update to next smallest key for iteration } while (it->more()); LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt + << " conflict txn count: " << response->conflict_txns_size() << " total iteration count: " << total_iteration_cnt; - response->set_finished(true); + response->set_finished(finished); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 037139704b6f5b..c5a62ed34f88fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -47,6 +47,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.SqlParserUtils; @@ -67,6 +68,8 @@ import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTabletType; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -373,11 +376,11 @@ protected void runWaitingTxnJob() throws AlterCancelException { Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState); try { - if (!isPreviousLoadFinished()) { + if (!checkFailedPreviousLoadAndAbort()) { LOG.info("wait transactions before {} to be finished, rollup job: {}", watershedTxnId, jobId); return; } - } catch (AnalysisException e) { + } catch (UserException e) { throw new AlterCancelException(e.getMessage()); } @@ -681,10 +684,18 @@ private void cancelInternal() { } } - // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. - protected boolean isPreviousLoadFinished() throws AnalysisException { - return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are failed + // and abort it if it is failed. + // If return true, all previous load is finish + protected boolean checkFailedPreviousLoadAndAbort() throws UserException { + List unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad( watershedTxnId, dbId, Lists.newArrayList(tableId)); + List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); + for (TransactionState txn : failedTxns) { + Env.getCurrentGlobalTransactionMgr() + .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + } + return unFinishedTxns.isEmpty(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d58e23ebbbc59b..43ac52611c96c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -44,6 +44,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.SchemaVersionAndHash; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.TimeUtils; @@ -58,6 +59,8 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -418,11 +421,11 @@ protected void addShadowIndexToCatalog(OlapTable tbl) { protected void runWaitingTxnJob() throws AlterCancelException { Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState); try { - if (!isPreviousLoadFinished()) { + if (!checkFailedPreviousLoadAndAbort()) { LOG.info("wait transactions before {} to be finished, schema change job: {}", watershedTxnId, jobId); return; } - } catch (AnalysisException e) { + } catch (UserException e) { throw new AlterCancelException(e.getMessage()); } @@ -792,10 +795,18 @@ private void cancelInternal() { jobState = JobState.CANCELLED; } - // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. - protected boolean isPreviousLoadFinished() throws AnalysisException { - return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished + // and abort it if it is failed. + // If return true, all previous load is finish + protected boolean checkFailedPreviousLoadAndAbort() throws UserException { + List unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad( watershedTxnId, dbId, Lists.newArrayList(tableId)); + List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); + for (TransactionState txn : failedTxns) { + Env.getCurrentGlobalTransactionMgr() + .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + } + return unFinishedTxns.isEmpty(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index a4d0e58247141e..d38b5e0ae99d92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1086,6 +1086,42 @@ public void finishTransaction(long dbId, long transactionId, Map par throw new UserException("Disallow to call finishTransaction()"); } + public List getUnFinishedPreviousLoad(long endTransactionId, long dbId, List tableIdList) + throws UserException { + LOG.info("getUnFinishedPreviousLoad(), endTransactionId:{}, dbId:{}, tableIdList:{}", + endTransactionId, dbId, tableIdList); + + if (endTransactionId <= 0) { + throw new UserException("Invaid endTransactionId:" + endTransactionId); + } + CheckTxnConflictRequest.Builder builder = CheckTxnConflictRequest.newBuilder(); + builder.setDbId(dbId); + builder.setEndTxnId(endTransactionId); + builder.addAllTableIds(tableIdList); + builder.setCloudUniqueId(Config.cloud_unique_id); + + final CheckTxnConflictRequest checkTxnConflictRequest = builder.build(); + CheckTxnConflictResponse checkTxnConflictResponse = null; + try { + LOG.info("CheckTxnConflictRequest:{}", checkTxnConflictRequest); + checkTxnConflictResponse = MetaServiceProxy + .getInstance().checkTxnConflict(checkTxnConflictRequest); + LOG.info("CheckTxnConflictResponse: {}", checkTxnConflictResponse); + } catch (RpcException e) { + throw new UserException(e.getMessage()); + } + + if (checkTxnConflictResponse.getStatus().getCode() != MetaServiceCode.OK) { + throw new UserException(checkTxnConflictResponse.getStatus().getMsg()); + } + List conflictTxnInfoPbs = checkTxnConflictResponse.getConflictTxnsList(); + List conflictTxns = new ArrayList<>(); + for (TxnInfoPB infoPb : conflictTxnInfoPbs) { + conflictTxns.add(TxnUtil.transactionStateFromPb(infoPb)); + } + return conflictTxns; + } + @Override public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List tableIdList) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index eb1a07cc69dfe4..cf32636a7ca25e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2302,6 +2302,29 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat return true; } + public List getUnFinishedPreviousLoad(long endTransactionId, List tableIdList) { + readLock(); + List unFishedTxns = new ArrayList<>(); + try { + for (Map.Entry entry : idToRunningTransactionState.entrySet()) { + if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(), + tableIdList) || entry.getValue().getTransactionStatus().isFinalStatus()) { + continue; + } + if (entry.getKey() <= endTransactionId) { + if (LOG.isDebugEnabled()) { + LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}", + entry.getKey(), dbId, endTransactionId); + } + unFishedTxns.add(entry.getValue()); + } + } + } finally { + readUnlock(); + } + return unFishedTxns; + } + public boolean isPreviousTransactionsFinished(long endTransactionId, List tableIdList) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 719999b809428a..21aef81ab6ecfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -38,6 +38,8 @@ import org.apache.doris.persist.BatchRemoveTransactionsOperation; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; +import org.apache.doris.system.Backend; +import org.apache.doris.system.Frontend; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TWaitingTxnStatusRequest; @@ -442,6 +444,54 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { return false; } + public static boolean checkFailedTxnsByStartTime(TransactionState txn) { + TxnCoordinator coordinator = txn.getCoordinator(); + if (coordinator.sourceType == TransactionState.TxnSourceType.FE) { + List frontends = Env.getCurrentEnv().getFrontends(null); + for (Frontend fe : frontends) { + if (fe.getHost().equals(coordinator.ip) && fe.getLastStartupTime() > coordinator.startTime) { + return true; + } + } + } else if (coordinator.sourceType == TransactionState.TxnSourceType.BE) { + Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id); + if (be.getHost().equals(coordinator.ip) && be.getLastStartTime() > coordinator.startTime) { + return true; + } + } + return false; + } + + public static List checkFailedTxns(List conflictTxns) { + List failedTxns = new ArrayList<>(); + for (TransactionState txn : conflictTxns) { + boolean failed = false; + if (!failed) { + failed = checkFailedTxnsByStartTime(txn); + } + if (failed) { + failedTxns.add(txn); + } + } + return failedTxns; + } + + public List getUnFinishedPreviousLoad(long endTransactionId, + long dbId, List tableIdList) throws UserException { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + return dbTransactionMgr.getUnFinishedPreviousLoad(endTransactionId, tableIdList); + } catch (AnalysisException e) { + // NOTICE: At present, this situation will only happen when the database no longer exists. + // In fact, getDatabaseTransactionMgr() should explicitly throw a MetaNotFoundException, + // but changing the type of exception will cause a large number of code changes, + // which is not worth the loss. + // So here just simply think that AnalysisException only means that db does not exist. + LOG.warn("Check whether all previous transactions in db [" + dbId + "] finished failed", e); + throw new UserException(e.getMessage()); + } + } + /** * if the table is deleted between commit and publish version, then should ignore the partition * diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java index 6fd32c8ee7bc8f..9a9b3e16b1ef9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java @@ -197,4 +197,7 @@ public List> getPartitionTransInfo(long dbId, long tid, long ta public void addSubTransaction(long dbId, long transactionId, long subTransactionId); public void removeSubTransaction(long dbId, long subTransactionId); + + public List getUnFinishedPreviousLoad(long endTransactionId, + long dbId, List tableIdList) throws UserException; } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f8797ea54e733a..aabf752c97b878 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -772,6 +772,17 @@ message GetCurrentMaxTxnResponse { optional int64 current_max_txn_id = 2; } +message AbortTxnByCoordinateBeRequest { + optional string cloud_unique_id = 1; // For auth + optional string ip = 2; + optional int64 id = 3; + optional int64 start_time = 4; +} + +message AbortTxnBycoordinateBeResponse { + optional MetaServiceResponseStatus status = 1; +} + message CheckTxnConflictRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; @@ -783,6 +794,7 @@ message CheckTxnConflictRequest { message CheckTxnConflictResponse { optional MetaServiceResponseStatus status = 1; optional bool finished = 2; + repeated TxnInfoPB conflict_txns = 3; } message CleanTxnLabelRequest { @@ -1457,6 +1469,7 @@ service MetaService { rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse); rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse); rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse); + rpc abort_txn_by_coordinate_be(AbortTxnByCoordinateBeRequest) returns (AbortTxnBycoordinateBeResponse); rpc get_version(GetVersionRequest) returns (GetVersionResponse); rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse); From 45bd416f18e00d08a73a9145dce9750cd0a2ec9b Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 15 Jul 2024 12:27:35 +0800 Subject: [PATCH 02/10] tmp --- cloud/src/meta-service/meta_service_txn.cpp | 61 ++++++++++++--------- cloud/test/meta_service_test.cpp | 3 + 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 20385b9de1b2cf..88d043ac67655d 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2737,7 +2737,7 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont if (running_pb.timeout_time() < check_time) { skip_timeout_txn_cnt++; } else { - LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) + LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) << " " << k << " running_pb=" << running_pb.ShortDebugString(); std::vector running_table_ids(running_pb.table_ids().begin(), running_pb.table_ids().end()); @@ -2751,31 +2751,40 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont if (result.size() > 0) { finished = false; std::vector, int, int>> out; - decode_key(&k, &out); - DCHECK(out.size() == 3) << out.size(); - const std::string& decode_instance_id = std::get<1>(std::get<0>(out[0])); - int64_t db_id = std::get<0>(std::get<0>(out[1])); - int64_t txn_id = std::get<0>(std::get<0>(out[1])); - std::string conflict_txn_info_key = - txn_info_key({decode_instance_id, db_id, txn_id}); - std::string conflict_txn_info_val; - err = txn->get(conflict_txn_info_key, &conflict_txn_info_val); - if (err != TxnErrorCode::TXN_OK) { - code = err == TxnErrorCode::TXN_KEY_NOT_FOUND - ? MetaServiceCode::TXN_ID_NOT_FOUND - : cast_as(err); - ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id; - msg = ss.str(); - LOG(WARNING) << msg; - return; - } - TxnInfoPB& conflict_txn_info = *response->add_conflict_txns(); - if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; - msg = ss.str(); - LOG(WARNING) << msg; - return; + std::string_view key_view = k; + key_view.remove_prefix(1); + int ret = decode_key(&key_view, &out); + if (ret != 0) [[unlikely]] { + // decode version key error means this is something wrong, + // we can not continue this txn + LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(k); + } else { + DCHECK(out.size() == 5) << " key=" << hex(k) << " " << out.size(); + const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1])); + int64_t db_id = std::get<0>(std::get<0>(out[3])); + int64_t txn_id = std::get<0>(std::get<0>(out[4])); + std::string conflict_txn_info_key = + txn_info_key({decode_instance_id, db_id, txn_id}); + std::string conflict_txn_info_val; + err = txn->get(conflict_txn_info_key, &conflict_txn_info_val); + if (err != TxnErrorCode::TXN_OK) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND + ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as(err); + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + TxnInfoPB& conflict_txn_info = *response->add_conflict_txns(); + if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse txn_info, db_id=" << db_id + << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } } } } diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 850f8cfbabd641..294575af3f1265 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -2097,6 +2097,8 @@ TEST(MetaServiceTest, CheckTxnConflictTest) { ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK); ASSERT_EQ(check_txn_conflict_res.finished(), false); + ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 1); + check_txn_conflict_res.clear_conflict_txns(); // mock rowset and tablet int64_t tablet_id_base = 123456; @@ -2125,6 +2127,7 @@ TEST(MetaServiceTest, CheckTxnConflictTest) { ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK); ASSERT_EQ(check_txn_conflict_res.finished(), true); + ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0); { std::string running_key = txn_running_key({mock_instance, db_id, txn_id}); From dc2de3ff627923a5702a8f7acedcf3d31a4a9cb9 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 15 Jul 2024 16:11:09 +0800 Subject: [PATCH 03/10] tmp --- .../src/main/java/org/apache/doris/common/Config.java | 10 ++++++++++ .../main/java/org/apache/doris/alter/RollupJobV2.java | 11 +++++++---- .../org/apache/doris/alter/SchemaChangeJobV2.java | 11 +++++++---- .../java/org/apache/doris/system/HeartbeatMgr.java | 3 ++- .../doris/transaction/DatabaseTransactionMgr.java | 4 ++-- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index eb158bf2a575e5..08d57a1113a698 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2714,6 +2714,16 @@ public class Config extends ConfigBase { @ConfField public static long spilled_profile_storage_limit_bytes = 1 * 1024 * 1024 * 1024; // 1GB + @ConfField(mutable = true, description = { + "是否通过检测协调者BE心跳来 abort 事务", + "SHould abort txn by checking coorinator be heartbeat"}) + public static boolean enable_abort_txn_by_checking_coordinator_be = true; + + @ConfField(mutable = true, description = { + "是否在 schema change 过程中, 检测冲突事物并 abort 它", + "SHould abort txn by checking conflick txn in schema change"}) + public static boolean enable_abort_txn_by_checking_conflict_txn = true; + //========================================================================== // begin of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index c5a62ed34f88fd..e38c91d296fed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -44,6 +44,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; @@ -690,10 +691,12 @@ private void cancelInternal() { protected boolean checkFailedPreviousLoadAndAbort() throws UserException { List unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad( watershedTxnId, dbId, Lists.newArrayList(tableId)); - List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); - for (TransactionState txn : failedTxns) { - Env.getCurrentGlobalTransactionMgr() - .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + if (Config.enable_abort_txn_by_checking_conflict_txn) { + List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); + for (TransactionState txn : failedTxns) { + Env.getCurrentGlobalTransactionMgr() + .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + } } return unFinishedTxns.isEmpty(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 43ac52611c96c0..527c8620c6cf77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -40,6 +40,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; @@ -801,10 +802,12 @@ private void cancelInternal() { protected boolean checkFailedPreviousLoadAndAbort() throws UserException { List unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad( watershedTxnId, dbId, Lists.newArrayList(tableId)); - List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); - for (TransactionState txn : failedTxns) { - Env.getCurrentGlobalTransactionMgr() - .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + if (Config.enable_abort_txn_by_checking_conflict_txn) { + List failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns); + for (TransactionState txn : failedTxns) { + Env.getCurrentGlobalTransactionMgr() + .abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change"); + } } return unFinishedTxns.isEmpty(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 93bc7df083cb3e..6a8008d6cbc99b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -176,7 +176,8 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { boolean isChanged = be.handleHbResponse(hbResponse, isReplay); if (hbResponse.getStatus() == HbStatus.OK) { long newStartTime = be.getLastStartTime(); - if (!isReplay && oldStartTime != newStartTime) { + if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be + && oldStartTime != newStartTime) { Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( be.getId(), be.getHost(), newStartTime); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cf32636a7ca25e..33bbcdaaddee03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2308,13 +2308,13 @@ public List getUnFinishedPreviousLoad(long endTransactionId, L try { for (Map.Entry entry : idToRunningTransactionState.entrySet()) { if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(), - tableIdList) || entry.getValue().getTransactionStatus().isFinalStatus()) { + tableIdList) || entry.getValue().getTransactionStatus().isFinalStatus()) { continue; } if (entry.getKey() <= endTransactionId) { if (LOG.isDebugEnabled()) { LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}", - entry.getKey(), dbId, endTransactionId); + entry.getKey(), dbId, endTransactionId); } unFishedTxns.add(entry.getValue()); } From 901aa8ed23571b0ee9abb29dc6cd87416840a1e7 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 15 Jul 2024 16:30:25 +0800 Subject: [PATCH 04/10] tmp --- cloud/src/meta-service/meta_service_txn.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 88d043ac67655d..a365b45f9216dc 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2628,7 +2628,8 @@ void MetaServiceImpl::abort_txn_by_coordinate_be(::google::protobuf::RpcControll return; } const auto& coordinate = info_pb.coordinator(); - if (coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() && + if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED && + coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() && coordinate.ip() == request->ip() && coordinate.start_time() < request->start_time()) { need_commit = true; From 66bbb6976ca4a784537d45b8f5324374d9f7dcfb Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Wed, 17 Jul 2024 00:30:11 +0800 Subject: [PATCH 05/10] tmp --- cloud/src/meta-service/meta_service.h | 10 +- cloud/src/meta-service/meta_service_txn.cpp | 6 +- .../doris/cloud/rpc/MetaServiceClient.java | 11 ++ .../doris/cloud/rpc/MetaServiceProxy.java | 10 ++ .../CloudGlobalTransactionMgr.java | 16 +- gensrc/proto/cloud.proto | 4 +- .../schema_change_p0/ddl/lineorder_create.sql | 31 ++++ .../schema_change_p0/ddl/lineorder_delete.sql | 1 + .../schema_change_p0/ddl/lineorder_load.sql | 6 + .../test_abort_txn_by_be_cloud1.groovy | 164 +++++++++++++++++ .../test_abort_txn_by_be_cloud2.groovy | 164 +++++++++++++++++ .../test_abort_txn_by_be_local5.groovy | 165 ++++++++++++++++++ .../test_abort_txn_by_be_local6.groovy | 165 ++++++++++++++++++ .../test_abort_txn_by_fe_cloud4.groovy | 104 +++++++++++ .../test_abort_txn_by_fe_local3.groovy | 105 +++++++++++ 15 files changed, 955 insertions(+), 7 deletions(-) create mode 100644 regression-test/suites/schema_change_p0/ddl/lineorder_create.sql create mode 100644 regression-test/suites/schema_change_p0/ddl/lineorder_delete.sql create mode 100644 regression-test/suites/schema_change_p0/ddl/lineorder_load.sql create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy create mode 100644 regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index f8e3f2fc239b9d..81d4d7af3f4dc0 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -91,7 +91,7 @@ class MetaServiceImpl : public cloud::MetaService { void abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, const AbortTxnByCoordinateBeRequest* request, - AbortTxnBycoordinateBeResponse* response, + AbortTxnByCoordinateBeResponse* response, ::google::protobuf::Closure* done) override; void clean_txn_label(::google::protobuf::RpcController* controller, @@ -356,6 +356,14 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::check_txn_conflict, controller, request, response, done); } + void abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, + const AbortTxnByCoordinateBeRequest* request, + AbortTxnByCoordinateBeResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::abort_txn_by_coordinate_be, controller, request, response, + done); + } + void clean_txn_label(::google::protobuf::RpcController* controller, const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response, ::google::protobuf::Closure* done) override { diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index a365b45f9216dc..c2fbca1be3c657 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2570,12 +2570,12 @@ void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle void MetaServiceImpl::abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, const AbortTxnByCoordinateBeRequest* request, - AbortTxnBycoordinateBeResponse* response, + AbortTxnByCoordinateBeResponse* response, ::google::protobuf::Closure* done) { RPC_PREPROCESS(abort_txn_by_coordinate_be); - if (!request->has_id() || !request->has_ip() || request->has_start_time()) { + if (!request->has_id() || !request->has_ip() || !request->has_start_time()) { code = MetaServiceCode::INVALID_ARGUMENT; - msg = "invalid coordinate id, end coordinate ip or coordinate start time."; + msg = "invalid coordinate id, coordinate ip or coordinate start time."; return; } // TODO: For auth diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index d5cdc79eb7f7d5..eb6b909bb88aaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -362,4 +362,15 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { } return blockingStub.getObjStoreInfo(request); } + + public Cloud.AbortTxnByCoordinateBeResponse + abortTxnByCoordinateBe(Cloud.AbortTxnByCoordinateBeRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.AbortTxnByCoordinateBeRequest.Builder builder = + Cloud.AbortTxnByCoordinateBeRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.abortTxnByCoordinateBe(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.abortTxnByCoordinateBe(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index d7ec328906775e..c8adaa04e488df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -507,4 +507,14 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo throw new RpcException("", e.getMessage(), e); } } + + public Cloud.AbortTxnByCoordinateBeResponse + abortTxnByCoordinateBe(Cloud.AbortTxnByCoordinateBeRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.abortTxnByCoordinateBe(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index d38b5e0ae99d92..b144bef4d6f4c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -30,6 +30,8 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse; +import org.apache.doris.cloud.proto.Cloud.AbortTxnByCoordinateBeRequest; +import org.apache.doris.cloud.proto.Cloud.AbortTxnByCoordinateBeResponse; import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest; @@ -1294,7 +1296,19 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro @Override public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { - // do nothing in cloud mode + AbortTxnByCoordinateBeRequest.Builder builder = AbortTxnByCoordinateBeRequest.newBuilder(); + builder.setIp(coordinateHost); + builder.setId(coordinateBeId); + builder.setStartTime(beStartTime); + final AbortTxnByCoordinateBeRequest request = builder.build(); + AbortTxnByCoordinateBeResponse response = null; + try { + response = MetaServiceProxy + .getInstance().abortTxnByCoordinateBe(request); + LOG.info("AbortTxnByCoordinateBeResponse: {}", response); + } catch (RpcException e) { + LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); + } } @Override diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index aabf752c97b878..5f9fa0db51f447 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -779,7 +779,7 @@ message AbortTxnByCoordinateBeRequest { optional int64 start_time = 4; } -message AbortTxnBycoordinateBeResponse { +message AbortTxnByCoordinateBeResponse { optional MetaServiceResponseStatus status = 1; } @@ -1469,7 +1469,7 @@ service MetaService { rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse); rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse); rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse); - rpc abort_txn_by_coordinate_be(AbortTxnByCoordinateBeRequest) returns (AbortTxnBycoordinateBeResponse); + rpc abort_txn_by_coordinate_be(AbortTxnByCoordinateBeRequest) returns (AbortTxnByCoordinateBeResponse); rpc get_version(GetVersionRequest) returns (GetVersionResponse); rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse); diff --git a/regression-test/suites/schema_change_p0/ddl/lineorder_create.sql b/regression-test/suites/schema_change_p0/ddl/lineorder_create.sql new file mode 100644 index 00000000000000..2c0753b51e4b7f --- /dev/null +++ b/regression-test/suites/schema_change_p0/ddl/lineorder_create.sql @@ -0,0 +1,31 @@ +CREATE TABLE IF NOT EXISTS `lineorder` ( + `lo_orderkey` bigint(20) NOT NULL COMMENT "", + `lo_linenumber` bigint(20) NOT NULL COMMENT "", + `lo_custkey` int(11) NOT NULL COMMENT "", + `lo_partkey` int(11) NOT NULL COMMENT "", + `lo_suppkey` int(11) NOT NULL COMMENT "", + `lo_orderdate` int(11) NOT NULL COMMENT "", + `lo_orderpriority` varchar(16) NOT NULL COMMENT "", + `lo_shippriority` int(11) NOT NULL COMMENT "", + `lo_quantity` bigint(20) NOT NULL COMMENT "", + `lo_extendedprice` bigint(20) NOT NULL COMMENT "", + `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "", + `lo_discount` bigint(20) NOT NULL COMMENT "", + `lo_revenue` bigint(20) NOT NULL COMMENT "", + `lo_supplycost` bigint(20) NOT NULL COMMENT "", + `lo_tax` bigint(20) NOT NULL COMMENT "", + `lo_commitdate` bigint(20) NOT NULL COMMENT "", + `lo_shipmode` varchar(11) NOT NULL COMMENT "" +) +PARTITION BY RANGE(`lo_orderdate`) +(PARTITION p1992 VALUES [("-2147483648"), ("19930101")), +PARTITION p1993 VALUES [("19930101"), ("19940101")), +PARTITION p1994 VALUES [("19940101"), ("19950101")), +PARTITION p1995 VALUES [("19950101"), ("19960101")), +PARTITION p1996 VALUES [("19960101"), ("19970101")), +PARTITION p1997 VALUES [("19970101"), ("19980101")), +PARTITION p1998 VALUES [("19980101"), ("19990101"))) +DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 48 +PROPERTIES ( +"replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/schema_change_p0/ddl/lineorder_delete.sql b/regression-test/suites/schema_change_p0/ddl/lineorder_delete.sql new file mode 100644 index 00000000000000..d8f94cfe9fcd8e --- /dev/null +++ b/regression-test/suites/schema_change_p0/ddl/lineorder_delete.sql @@ -0,0 +1 @@ +drop table if exists lineorder; \ No newline at end of file diff --git a/regression-test/suites/schema_change_p0/ddl/lineorder_load.sql b/regression-test/suites/schema_change_p0/ddl/lineorder_load.sql new file mode 100644 index 00000000000000..a5ed2465ea99c6 --- /dev/null +++ b/regression-test/suites/schema_change_p0/ddl/lineorder_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/lineorder.tbl.*.gz") + INTO TABLE lineorder + COLUMNS TERMINATED BY "|" + (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,temp) +) diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy new file mode 100644 index 00000000000000..03f77917731c64 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_be_cloud1') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + + thread { + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + set 'columns', column + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" + + time 600 * 1000 + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + sleep(10000) + + sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + assertEquals(result, "WAITING_TXN"); + + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 10000 + + result = getJobState(tableName) + assertEquals(result, "WAITING_TXN"); + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 20000 + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy new file mode 100644 index 00000000000000..7a2d382f3abfda --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_be_cloud2') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + + thread { + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + set 'columns', column + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" + + time 600 * 1000 + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + sleep(10000) + + sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + assertEquals(result, "WAITING_TXN"); + + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 10000 + + result = getJobState(tableName) + assertEquals(result, "WAITING_TXN"); + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy new file mode 100644 index 00000000000000..df4fb5d637e566 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_be_local5') { + def options = new ClusterOptions() + options.cloudMode = false + options.skipRunWhenPipelineDiff = false + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + + thread { + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + set 'columns', column + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" + + time 600 * 1000 + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + sleep(10000) + + sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + assertEquals(result, "WAITING_TXN"); + + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 10000 + + result = getJobState(tableName) + assertEquals(result, "WAITING_TXN"); + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 20000 + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy new file mode 100644 index 00000000000000..1f6e6df4417212 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_be_local6') { + def options = new ClusterOptions() + options.cloudMode = false + options.skipRunWhenPipelineDiff = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + + thread { + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + set 'columns', column + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" + + time 600 * 1000 + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + sleep(10000) + + sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + assertEquals(result, "WAITING_TXN"); + + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 10000 + + result = getJobState(tableName) + assertEquals(result, "WAITING_TXN"); + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(tableName) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy new file mode 100644 index 00000000000000..bd12d57fd34ed1 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_fe_cloud4') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def table= "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + '_' + uniqueID + + // load data from cos + def loadSql = new File("""${context.file.parent}/ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + def coordinatorFe = cluster.getAllFrontends().get(0) + def coordinatorFeHost = coordinatorFe.host + + sleep(5000) + + sql """ alter table ${table} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(table) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + sleep 10000 + assertEquals(result, "WAITING_TXN"); + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(table) + System.out.println(result) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy new file mode 100644 index 00000000000000..37667abe9506d7 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_abort_txn_by_fe_local3') { + def options = new ClusterOptions() + options.cloudMode = false + options.skipRunWhenPipelineDiff = false + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] + options.beNum = 1 + + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def table= "lineorder" + // create table if not exists + sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text + sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + '_' + uniqueID + + // load data from cos + def loadSql = new File("""${context.file.parent}/ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + def coordinatorFe = cluster.getAllFrontends().get(0) + def coordinatorFeHost = coordinatorFe.host + + sleep(5000) + + sql """ alter table ${table} modify column lo_suppkey bigint NULL """ + + String result = "" + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState(table) + if (result == "PENDING") { + sleep(3000) + } else { + break; + } + } + if (max_try_time < 1){ + assertEquals(1,2) + } + sleep 10000 + assertEquals(result, "WAITING_TXN"); + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + max_try_time = 3000 + while (max_try_time--){ + result = getJobState(table) + System.out.println(result) + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } +} From 39d800bd0b9f928552e276ad2a49f8143151ddbb Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 22 Jul 2024 17:26:33 +0800 Subject: [PATCH 06/10] tmp --- cloud/src/common/bvars.cpp | 2 +- cloud/src/common/bvars.h | 2 +- cloud/src/meta-service/meta_service.h | 14 ++-- cloud/src/meta-service/meta_service_txn.cpp | 10 +-- cloud/test/meta_service_test.cpp | 71 +++++++++++++++++++ .../doris/cloud/rpc/MetaServiceClient.java | 12 ++-- .../doris/cloud/rpc/MetaServiceProxy.java | 6 +- .../CloudGlobalTransactionMgr.java | 14 ++-- .../transaction/GlobalTransactionMgr.java | 8 ++- gensrc/proto/cloud.proto | 6 +- 10 files changed, 109 insertions(+), 36 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 81d46b5f2b12ed..4141ca293652be 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -30,7 +30,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", "get_current_m BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn"); BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", "check_txn_conflict"); -BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_by_coordinate_be("ms", "abort_txn_by_coordinate_be"); +BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator("ms", "abort_txn_with_coordinator"); BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label"); BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version"); BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version("ms", "batch_get_version"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 5949bf9d8b3cdb..7e732d03f4b8d1 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -126,7 +126,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict; -extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_by_coordinate_be; +extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator; extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 81d4d7af3f4dc0..bb669e0df94be8 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -89,9 +89,9 @@ class MetaServiceImpl : public cloud::MetaService { CheckTxnConflictResponse* response, ::google::protobuf::Closure* done) override; - void abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, - const AbortTxnByCoordinateBeRequest* request, - AbortTxnByCoordinateBeResponse* response, + void abort_txn_with_coordinator(::google::protobuf::RpcController* controller, + const AbortTxnWithCoordinatorRequest* request, + AbortTxnWithCoordinatorResponse* response, ::google::protobuf::Closure* done) override; void clean_txn_label(::google::protobuf::RpcController* controller, @@ -356,11 +356,11 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::check_txn_conflict, controller, request, response, done); } - void abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, - const AbortTxnByCoordinateBeRequest* request, - AbortTxnByCoordinateBeResponse* response, + void abort_txn_with_coordinator(::google::protobuf::RpcController* controller, + const AbortTxnWithCoordinatorRequest* request, + AbortTxnWithCoordinatorResponse* response, ::google::protobuf::Closure* done) override { - call_impl(&cloud::MetaService::abort_txn_by_coordinate_be, controller, request, response, + call_impl(&cloud::MetaService::abort_txn_with_coordinator, controller, request, response, done); } diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index c2fbca1be3c657..9db88215aaaf68 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2568,11 +2568,11 @@ void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controlle response->mutable_txn_info()->CopyFrom(txn_info); } -void MetaServiceImpl::abort_txn_by_coordinate_be(::google::protobuf::RpcController* controller, - const AbortTxnByCoordinateBeRequest* request, - AbortTxnByCoordinateBeResponse* response, +void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* controller, + const AbortTxnWithCoordinatorRequest* request, + AbortTxnWithCoordinatorResponse* response, ::google::protobuf::Closure* done) { - RPC_PREPROCESS(abort_txn_by_coordinate_be); + RPC_PREPROCESS(abort_txn_with_coordinator); if (!request->has_id() || !request->has_ip() || !request->has_start_time()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = "invalid coordinate id, coordinate ip or coordinate start time."; @@ -2588,7 +2588,7 @@ void MetaServiceImpl::abort_txn_by_coordinate_be(::google::protobuf::RpcControll msg = ss.str(); return; } - RPC_RATE_LIMIT(abort_txn_by_coordinate_be); + RPC_RATE_LIMIT(abort_txn_with_coordinator); std::string begin_info_key = txn_info_key({instance_id, 0, 0}); std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX}); LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 294575af3f1265..b44cb1ae7d05c1 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -2054,6 +2054,77 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) { ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id()); } +TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) { + auto meta_service = get_meta_service(); + + const int64_t db_id = 666; + const int64_t table_id = 777; + const std::string label = "test_label"; + const std::string cloud_unique_id = "test_cloud_unique_id"; + const int64_t coordinator_id = 15623; + int64_t cur_time = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + std::string host = "127.0.0.1:15586"; + int64_t txn_id = -1; + + brpc::Controller begin_txn_cntl; + BeginTxnRequest begin_txn_req; + BeginTxnResponse begin_txn_res; + TxnInfoPB txn_info_pb; + TxnCoordinatorPB coordinator; + + begin_txn_req.set_cloud_unique_id(cloud_unique_id); + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label(label); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + coordinator.set_id(coordinator_id); + coordinator.set_ip(host); + coordinator.set_sourcetype(::doris::cloud::TxnSourceTypePB::TXN_SOURCE_TYPE_BE); + coordinator.set_start_time(cur_time); + txn_info_pb.mutable_coordinator()->CopyFrom(coordinator); + begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb); + + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), + &begin_txn_req, &begin_txn_res, nullptr); + ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK); + txn_id = begin_txn_res.txn_id(); + ASSERT_GT(txn_id, -1); + + brpc::Controller abort_txn_cntl; + AbortTxnWithCoordinatorRequest abort_txn_req; + AbortTxnWithCoordinatorResponse abort_txn_resp; + + abort_txn_req.set_id(coordinator_id); + abort_txn_req.set_ip(host); + abort_txn_req.set_start_time(cur_time + 3600); + + // first time to check txn conflict + meta_service->abort_txn_with_coordinator( + reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), + &abort_txn_req, &abort_txn_resp, nullptr); + ASSERT_EQ(abort_txn_resp.status().code(), MetaServiceCode::OK); + + brpc::Controller abort_txn_conflict_cntl; + CheckTxnConflictRequest check_txn_conflict_req; + CheckTxnConflictResponse check_txn_conflict_res; + + check_txn_conflict_req.set_cloud_unique_id(cloud_unique_id); + check_txn_conflict_req.set_db_id(db_id); + check_txn_conflict_req.set_end_txn_id(txn_id + 1); + check_txn_conflict_req.add_table_ids(table_id); + + // first time to check txn conflict + meta_service->check_txn_conflict( + reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), + &check_txn_conflict_req, &check_txn_conflict_res, nullptr); + + ASSERT_EQ(check_txn_conflict_res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(check_txn_conflict_res.finished(), true); + ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0); +} + TEST(MetaServiceTest, CheckTxnConflictTest) { auto meta_service = get_meta_service(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index eb6b909bb88aaf..bc44241446c7a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -363,14 +363,14 @@ public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { return blockingStub.getObjStoreInfo(request); } - public Cloud.AbortTxnByCoordinateBeResponse - abortTxnByCoordinateBe(Cloud.AbortTxnByCoordinateBeRequest request) { + public Cloud.AbortTxnWithCoordinatorResponse + abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest request) { if (!request.hasCloudUniqueId()) { - Cloud.AbortTxnByCoordinateBeRequest.Builder builder = - Cloud.AbortTxnByCoordinateBeRequest.newBuilder(); + Cloud.AbortTxnWithCoordinatorRequest.Builder builder = + Cloud.AbortTxnWithCoordinatorRequest.newBuilder(); builder.mergeFrom(request); - return blockingStub.abortTxnByCoordinateBe(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + return blockingStub.abortTxnWithCoordinator(builder.setCloudUniqueId(Config.cloud_unique_id).build()); } - return blockingStub.abortTxnByCoordinateBe(request); + return blockingStub.abortTxnWithCoordinator(request); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index c8adaa04e488df..0479ae47968b16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -508,11 +508,11 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo } } - public Cloud.AbortTxnByCoordinateBeResponse - abortTxnByCoordinateBe(Cloud.AbortTxnByCoordinateBeRequest request) throws RpcException { + public Cloud.AbortTxnWithCoordinatorResponse + abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest request) throws RpcException { try { final MetaServiceClient client = getProxy(); - return client.abortTxnByCoordinateBe(request); + return client.abortTxnWithCoordinator(request); } catch (Exception e) { throw new RpcException("", e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index b144bef4d6f4c2..98d2fccf164e01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -30,8 +30,8 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse; -import org.apache.doris.cloud.proto.Cloud.AbortTxnByCoordinateBeRequest; -import org.apache.doris.cloud.proto.Cloud.AbortTxnByCoordinateBeResponse; +import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest; +import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse; import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest; @@ -1296,16 +1296,16 @@ public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) thro @Override public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { - AbortTxnByCoordinateBeRequest.Builder builder = AbortTxnByCoordinateBeRequest.newBuilder(); + AbortTxnWithCoordinatorRequest.Builder builder = AbortTxnWithCoordinatorRequest.newBuilder(); builder.setIp(coordinateHost); builder.setId(coordinateBeId); builder.setStartTime(beStartTime); - final AbortTxnByCoordinateBeRequest request = builder.build(); - AbortTxnByCoordinateBeResponse response = null; + final AbortTxnWithCoordinatorRequest request = builder.build(); + AbortTxnWithCoordinatorResponse response = null; try { response = MetaServiceProxy - .getInstance().abortTxnByCoordinateBe(request); - LOG.info("AbortTxnByCoordinateBeResponse: {}", response); + .getInstance().abortTxnWithCoordinator(request); + LOG.info("AbortTxnWithCoordinatorResponse: {}", response); } catch (RpcException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 21aef81ab6ecfb..a5e84129de08af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -444,7 +444,7 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { return false; } - public static boolean checkFailedTxnsByStartTime(TransactionState txn) { + public static boolean checkFailedTxnsByCoordinator(TransactionState txn) { TxnCoordinator coordinator = txn.getCoordinator(); if (coordinator.sourceType == TransactionState.TxnSourceType.FE) { List frontends = Env.getCurrentEnv().getFrontends(null); @@ -455,7 +455,9 @@ public static boolean checkFailedTxnsByStartTime(TransactionState txn) { } } else if (coordinator.sourceType == TransactionState.TxnSourceType.BE) { Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id); - if (be.getHost().equals(coordinator.ip) && be.getLastStartTime() > coordinator.startTime) { + if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime + || (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs() + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) { return true; } } @@ -467,7 +469,7 @@ public static List checkFailedTxns(List conf for (TransactionState txn : conflictTxns) { boolean failed = false; if (!failed) { - failed = checkFailedTxnsByStartTime(txn); + failed = checkFailedTxnsByCoordinator(txn); } if (failed) { failedTxns.add(txn); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 5f9fa0db51f447..6889d2aac2bc38 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -772,14 +772,14 @@ message GetCurrentMaxTxnResponse { optional int64 current_max_txn_id = 2; } -message AbortTxnByCoordinateBeRequest { +message AbortTxnWithCoordinatorRequest { optional string cloud_unique_id = 1; // For auth optional string ip = 2; optional int64 id = 3; optional int64 start_time = 4; } -message AbortTxnByCoordinateBeResponse { +message AbortTxnWithCoordinatorResponse { optional MetaServiceResponseStatus status = 1; } @@ -1469,7 +1469,7 @@ service MetaService { rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse); rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse); rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse); - rpc abort_txn_by_coordinate_be(AbortTxnByCoordinateBeRequest) returns (AbortTxnByCoordinateBeResponse); + rpc abort_txn_with_coordinator(AbortTxnWithCoordinatorRequest) returns (AbortTxnWithCoordinatorResponse); rpc get_version(GetVersionRequest) returns (GetVersionResponse); rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse); From f5eb42a851c991c0be74e7eb63d6b7bde32235b1 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 22 Jul 2024 17:28:24 +0800 Subject: [PATCH 07/10] tmp --- cloud/test/meta_service_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index b44cb1ae7d05c1..c5cec530e6b109 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -2102,8 +2102,8 @@ TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) { // first time to check txn conflict meta_service->abort_txn_with_coordinator( - reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), - &abort_txn_req, &abort_txn_resp, nullptr); + reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), &abort_txn_req, + &abort_txn_resp, nullptr); ASSERT_EQ(abort_txn_resp.status().code(), MetaServiceCode::OK); brpc::Controller abort_txn_conflict_cntl; From 58cea6e2ecbd10ade78bb60194ab9f0afe3739aa Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 22 Jul 2024 17:36:11 +0800 Subject: [PATCH 08/10] tmp --- .../doris/cloud/transaction/CloudGlobalTransactionMgr.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 98d2fccf164e01..862c72fe742709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -30,10 +30,10 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse; -import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest; -import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse; import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest; import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse; +import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest; +import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse; import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest; import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse; import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest; From 0be916088c0c49c6568ddd6c203386a64219d5c6 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 29 Jul 2024 12:26:53 +0800 Subject: [PATCH 09/10] tmp --- cloud/src/meta-service/meta_service_txn.cpp | 51 ++++++++++++--------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 9db88215aaaf68..29190c06330720 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2662,6 +2662,25 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll } } +std::string get_txn_info_key_from_txn_running_key(std::string_view txn_running_key) { + std::string conflict_txn_info_key; + std::vector, int, int>> out; + txn_running_key.remove_prefix(1); + int ret = decode_key(&txn_running_key, &out); + if (ret != 0) [[unlikely]] { + // decode version key error means this is something wrong, + // we can not continue this txn + LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(txn_running_key); + } else { + DCHECK(out.size() == 5) << " key=" << hex(txn_running_key) << " " << out.size(); + const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1])); + int64_t db_id = std::get<0>(std::get<0>(out[3])); + int64_t txn_id = std::get<0>(std::get<0>(out[4])); + conflict_txn_info_key = txn_info_key({decode_instance_id, db_id, txn_id}); + } + return conflict_txn_info_key; +} + void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller, const CheckTxnConflictRequest* request, CheckTxnConflictResponse* response, @@ -2745,34 +2764,22 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont std::sort(running_table_ids.begin(), running_table_ids.end()); std::vector result( std::min(running_table_ids.size(), src_table_ids.size())); - std::vector::iterator iter = std::set_intersection( - src_table_ids.begin(), src_table_ids.end(), running_table_ids.begin(), - running_table_ids.end(), result.begin()); + auto iter = std::set_intersection(src_table_ids.begin(), src_table_ids.end(), + running_table_ids.begin(), + running_table_ids.end(), result.begin()); result.resize(iter - result.begin()); - if (result.size() > 0) { + if (!result.empty()) { finished = false; - std::vector, int, int>> out; - std::string_view key_view = k; - key_view.remove_prefix(1); - int ret = decode_key(&key_view, &out); - if (ret != 0) [[unlikely]] { - // decode version key error means this is something wrong, - // we can not continue this txn - LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(k); - } else { - DCHECK(out.size() == 5) << " key=" << hex(k) << " " << out.size(); - const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1])); - int64_t db_id = std::get<0>(std::get<0>(out[3])); - int64_t txn_id = std::get<0>(std::get<0>(out[4])); - std::string conflict_txn_info_key = - txn_info_key({decode_instance_id, db_id, txn_id}); + std::string conflict_txn_info_key = get_txn_info_key_from_txn_running_key(k); + if (!conflict_txn_info_key.empty()) { std::string conflict_txn_info_val; err = txn->get(conflict_txn_info_key, &conflict_txn_info_val); if (err != TxnErrorCode::TXN_OK) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND : cast_as(err); - ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id; + ss << "failed to get txn_info, conflict_txn_info_key=" + << hex(conflict_txn_info_key); msg = ss.str(); LOG(WARNING) << msg; return; @@ -2780,8 +2787,8 @@ void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont TxnInfoPB& conflict_txn_info = *response->add_conflict_txns(); if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "failed to parse txn_info, db_id=" << db_id - << " txn_id=" << txn_id; + ss << "failed to parse txn_info, conflict_txn_info_key=" + << hex(conflict_txn_info_key); msg = ss.str(); LOG(WARNING) << msg; return; From 127cd9ef441ec227b80a42b7fec29ab7958f6a01 Mon Sep 17 00:00:00 2001 From: Lchangliang <915311741@qq.com> Date: Mon, 29 Jul 2024 15:29:55 +0800 Subject: [PATCH 10/10] tmp --- cloud/src/meta-service/meta_service_txn.cpp | 2 +- .../transaction/GlobalTransactionMgr.java | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 29190c06330720..15634ba2a8b431 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -2617,7 +2617,7 @@ void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll while (it->has_next()) { total_iteration_cnt++; auto [k, v] = it->next(); - LOG(INFO) << "check txn info txn_info_key=" << hex(k); + VLOG_DEBUG << "check txn info txn_info_key=" << hex(k); TxnInfoPB info_pb; if (!info_pb.ParseFromArray(v.data(), v.size())) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index a5e84129de08af..57ee12fe86c20e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -446,32 +446,35 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { public static boolean checkFailedTxnsByCoordinator(TransactionState txn) { TxnCoordinator coordinator = txn.getCoordinator(); + boolean offline = true; if (coordinator.sourceType == TransactionState.TxnSourceType.FE) { List frontends = Env.getCurrentEnv().getFrontends(null); for (Frontend fe : frontends) { - if (fe.getHost().equals(coordinator.ip) && fe.getLastStartupTime() > coordinator.startTime) { - return true; + if (fe.getHost().equals(coordinator.ip)) { + offline = false; + if (fe.getLastStartupTime() > coordinator.startTime) { + return true; + } } } } else if (coordinator.sourceType == TransactionState.TxnSourceType.BE) { Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id); - if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime - || (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs() - >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) { - return true; + if (be != null) { + offline = false; + if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime + || (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs() + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) { + return true; + } } } - return false; + return offline; } public static List checkFailedTxns(List conflictTxns) { List failedTxns = new ArrayList<>(); for (TransactionState txn : conflictTxns) { - boolean failed = false; - if (!failed) { - failed = checkFailedTxnsByCoordinator(txn); - } - if (failed) { + if (checkFailedTxnsByCoordinator(txn)) { failedTxns.add(txn); } }