From 2e1f41c269372eb8822402b4d1786e539f5943b3 Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 8 May 2025 17:26:23 +0800 Subject: [PATCH] [fix](binlog) Acquire migration lock before ingesting binlog (#50663) --- be/src/service/backend_service.cpp | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 9f6413762f0ef6..116d4951723b0a 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1096,13 +1096,18 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, PUniqueId p_load_id; p_load_id.set_hi(load_id.hi); p_load_id.set_lo(load_id.lo); - auto status = _engine.txn_manager()->prepare_txn(partition_id, *local_tablet, txn_id, p_load_id, - is_ingrest); - if (!status.ok()) { - LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; + + { + // See RowsetBuilder::prepare_txn for details + std::shared_lock base_migration_lock(local_tablet->get_migration_lock()); + auto status = _engine.txn_manager()->prepare_txn(partition_id, *local_tablet, txn_id, + p_load_id, is_ingrest); + if (!status.ok()) { + LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } } bool is_async = (_ingest_binlog_workers != nullptr); @@ -1123,7 +1128,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, }; if (is_async) { - status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); + auto status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); if (!status.ok()) { status.to_thrift(&tstatus); return;